Class MCollective::Message
In: lib/mcollective/message.rb
Parent: Object

container for a message, its headers, agent, collective and other meta data

Methods

Constants

VALIDTYPES = [:message, :request, :direct_request, :reply]

Attributes

agent  [RW] 
collective  [RW] 
discovered_hosts  [RW] 
filter  [RW] 
headers  [RW] 
message  [R] 
msgtime  [R] 
options  [RW] 
payload  [R] 
request  [R] 
requestid  [RW] 
ttl  [RW] 
type  [R] 
validated  [R] 

Public Class methods

payload - the message body without headers etc, just the text message - the original message received from the middleware options[:base64] - if the body base64 encoded? options[:agent] - the agent the message is for/from options[:collective] - the collective its for/from options[:headers] - the message headers options[:type] - an indicator about the type of message, :message, :request, :direct_request or :reply options[:request] - if this is a reply this should old the message we are replying to options[:filter] - for requests, the filter to encode into the message options[:options] - the normal client options hash options[:ttl] - the maximum amount of seconds this message can be valid for

[Source]

    # File lib/mcollective/message.rb, line 21
21:         def initialize(payload, message, options = {})
22:             options = {:base64 => false,
23:                        :agent => nil,
24:                        :headers => {},
25:                        :type => :message,
26:                        :request => nil,
27:                        :filter => Util.empty_filter,
28:                        :options => false,
29:                        :ttl => 60,
30:                        :collective => nil}.merge(options)
31: 
32:             @payload = payload
33:             @message = message
34:             @requestid = nil
35:             @discovered_hosts = nil
36: 
37:             @type = options[:type]
38:             @headers = options[:headers]
39:             @base64 = options[:base64]
40:             @filter = options[:filter]
41:             @options = options[:options]
42:             @ttl = options[:ttl]
43:             @msgtime = 0
44: 
45:             @validated = false
46: 
47:             if options[:request]
48:                 @request = options[:request]
49:                 @agent = request.agent
50:                 @collective = request.collective
51:                 @type = :reply
52:             else
53:                 @agent = options[:agent]
54:                 @collective = options[:collective]
55:             end
56: 
57:             base64_decode!
58:         end

Public Instance methods

[Source]

    # File lib/mcollective/message.rb, line 97
97:         def base64?
98:             @base64
99:         end

[Source]

    # File lib/mcollective/message.rb, line 83
83:         def base64_decode!
84:             return unless @base64
85: 
86:             @body = SSL.base64_decode(@body)
87:             @base64 = false
88:         end

[Source]

    # File lib/mcollective/message.rb, line 90
90:         def base64_encode!
91:             return if @base64
92: 
93:             @body = SSL.base64_encode(@body)
94:             @base64 = true
95:         end

[Source]

     # File lib/mcollective/message.rb, line 164
164:         def create_reqid
165:             Digest::MD5.hexdigest("#{Config.instance.identity}-#{Time.now.to_f}-#{agent}-#{collective}")
166:         end

[Source]

     # File lib/mcollective/message.rb, line 116
116:         def decode!
117:             raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)
118: 
119:             @payload = PluginManager["security_plugin"].decodemsg(self)
120: 
121:             [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
122:                 instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
123:             end
124:         end

[Source]

     # File lib/mcollective/message.rb, line 101
101:         def encode!
102:             case type
103:                 when :reply
104:                     raise "Cannot encode a reply message if no request has been associated with it" unless request
105: 
106:                     @requestid = request.payload[:requestid]
107:                     @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
108:                 when :request, :direct_request
109:                     @requestid = create_reqid
110:                     @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
111:                 else
112:                     raise "Cannot encode #{type} messages"
113:             end
114:         end

publish a reply message by creating a target name and sending it

[Source]

     # File lib/mcollective/message.rb, line 146
146:         def publish
147:             Timeout.timeout(2) do
148:                 # If we've been specificaly told about hosts that were discovered
149:                 # use that information to do P2P calls if appropriate else just
150:                 # send it as is.
151:                 if @discovered_hosts && Config.instance.direct_addressing
152:                     if @discovered_hosts.size <= Config.instance.direct_addressing_threshold
153:                         @type = :direct_request
154:                         Log.debug("Handling #{requestid} as a direct request")
155:                     end
156: 
157:                     PluginManager["connector_plugin"].publish(self)
158:                 else
159:                     PluginManager["connector_plugin"].publish(self)
160:                 end
161:             end
162:         end

Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.

Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.

[Source]

    # File lib/mcollective/message.rb, line 69
69:         def type=(type)
70:             if type == :direct_request
71:                 raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing
72: 
73:                 unless @discovered_hosts && !@discovered_hosts.empty?
74:                     raise "Can only set type to :direct_request if discovered_hosts have been set"
75:                 end
76:             end
77: 
78:             raise "Unknown message type #{type}" unless VALIDTYPES.include?(type)
79: 
80:             @type = type
81:         end

Perform validation against the message by checking filters and ttl

[Source]

     # File lib/mcollective/message.rb, line 127
127:         def validate
128:             raise "Can only validate request messages" unless type == :request
129: 
130:             msg_age = Time.now.utc.to_i - msgtime
131: 
132:             if msg_age > ttl
133:                 cid = ""
134:                 cid += payload[:callerid] + "@" if payload.include?(:callerid)
135:                 cid += payload[:senderid]
136: 
137:                 raise(MsgTTLExpired, "Message #{requestid} from #{cid} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}") if msg_age > ttl
138:             end
139: 
140:             raise(NotTargettedAtUs, "Received message is not targetted to us") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])
141: 
142:             @validated = true
143:         end

[Validate]