Class MCollective::Client
In: lib/mcollective/client.rb
Parent: Object

Helpers for writing clients that can talk to agents, do discovery and so forth

Methods

Attributes

options  [RW] 
stats  [RW] 

Public Class methods

[Source]

    # File lib/mcollective/client.rb, line 8
 8:         def initialize(configfile)
 9:             @config = Config.instance
10:             @config.loadconfig(configfile) unless @config.configured
11: 
12:             @connection = PluginManager["connector_plugin"]
13:             @security = PluginManager["security_plugin"]
14: 
15:             @security.initiated_by = :client
16:             @options = nil
17:             @subscriptions = {}
18: 
19:             @connection.connect
20:         end

Public Instance methods

Returns the configured main collective if no specific collective is specified as options

[Source]

    # File lib/mcollective/client.rb, line 24
24:         def collective
25:             if @options[:collective].nil?
26:                 @config.main_collective
27:             else
28:                 @options[:collective]
29:             end
30:         end

Disconnects cleanly from the middleware

[Source]

    # File lib/mcollective/client.rb, line 33
33:         def disconnect
34:             Log.debug("Disconnecting from the middleware")
35:             @connection.disconnect
36:         end

Performs a discovery of nodes matching the filter passed returns an array of nodes

[Source]

     # File lib/mcollective/client.rb, line 96
 96:         def discover(filter, timeout)
 97:             begin
 98:                 hosts = []
 99:                 Timeout.timeout(timeout) do
100:                     reqid = sendreq("ping", "discovery", filter)
101:                     Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}")
102: 
103:                     loop do
104:                         reply = receive(reqid)
105:                         Log.debug("Got discovery reply from #{reply.payload[:senderid]}")
106:                         hosts << reply.payload[:senderid]
107:                     end
108:                 end
109:             rescue Timeout::Error => e
110:                 hosts.sort
111:             rescue Exception => e
112:                 raise
113:             end
114:         end

Performs a discovery and then send a request, performs the passed block for each response

   times = discovered_req("status", "mcollectived", options, client) {|resp|
      pp resp
   }

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 174
174:         def discovered_req(body, agent, options=false)
175:             stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
176: 
177:             options = @options unless options
178: 
179:             STDOUT.sync = true
180: 
181:             print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ")
182: 
183:             begin
184:                 discovered_hosts = discover(options[:filter], options[:disctimeout])
185:                 discovered = discovered_hosts.size
186:                 hosts_responded = []
187:                 hosts_not_responded = discovered_hosts
188: 
189:                 stat[:discoverytime] = Time.now.to_f - stat[:starttime]
190: 
191:                 puts("#{discovered}\n\n")
192:             rescue Interrupt
193:                 puts("Discovery interrupted.")
194:                 exit!
195:             end
196: 
197:             raise("No matching clients found") if discovered == 0
198: 
199:             begin
200:                 Timeout.timeout(options[:timeout]) do
201:                     reqid = sendreq(body, agent, options[:filter])
202: 
203:                     (1..discovered).each do |c|
204:                         resp = receive(reqid)
205: 
206:                         hosts_responded << resp.payload[:senderid]
207:                         hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid])
208: 
209:                         yield(resp.payload)
210:                     end
211:                 end
212:             rescue Interrupt => e
213:             rescue Timeout::Error => e
214:             end
215: 
216:             stat[:totaltime] = Time.now.to_f - stat[:starttime]
217:             stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
218:             stat[:responses] = hosts_responded.size
219:             stat[:responsesfrom] = hosts_responded
220:             stat[:noresponsefrom] = hosts_not_responded
221:             stat[:discovered] = discovered
222: 
223:             @stats = stat
224:             return stat
225:         end

Prints out the stats returns from req and discovered_req in a nice way

[Source]

     # File lib/mcollective/client.rb, line 228
228:         def display_stats(stats, options=false, caption="stomp call summary")
229:             options = @options unless options
230: 
231:             if options[:verbose]
232:                 puts("\n---- #{caption} ----")
233: 
234:                 if stats[:discovered]
235:                     puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
236:                 else
237:                     puts("           Nodes: #{stats[:responses]}")
238:                 end
239: 
240:                 printf("      Start Time: %s\n", Time.at(stats[:starttime]))
241:                 printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
242:                 printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
243:                 printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)
244: 
245:             else
246:                 if stats[:discovered]
247:                     printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
248:                 else
249:                     printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
250:                 end
251:             end
252: 
253:             if stats[:noresponsefrom].size > 0
254:                 puts("\nNo response from:\n")
255: 
256:                 stats[:noresponsefrom].each do |c|
257:                     puts if c % 4 == 1
258:                     printf("%30s", c)
259:                 end
260: 
261:                 puts
262:             end
263:         end

Blocking call that waits for ever for a message to arrive.

If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.

[Source]

    # File lib/mcollective/client.rb, line 71
71:         def receive(requestid = nil)
72:             reply = nil
73: 
74:             begin
75:                 reply = @connection.receive
76:                 reply.type = :reply
77: 
78:                 reply.decode!
79: 
80:                 reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON")
81: 
82:                 raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid
83:             rescue SecurityValidationFailed => e
84:                 Log.warn("Ignoring a message that did not pass security validations")
85:                 retry
86:             rescue MsgDoesNotMatchRequestID => e
87:                 Log.debug("Ignoring a message for some other client")
88:                 retry
89:             end
90: 
91:             reply
92:         end

Send a request, performs the passed block for each response

times = req("status", "mcollectived", options, client) {|resp|

  pp resp

}

It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser

[Source]

     # File lib/mcollective/client.rb, line 124
124:         def req(body, agent=nil, options=false, waitfor=0)
125:             if body.is_a?(Message)
126:                 agent = body.agent
127:                 options = body.options
128:                 waitfor = body.discovered_hosts.size || 0
129:             end
130: 
131:             stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
132: 
133:             options = @options unless options
134: 
135:             STDOUT.sync = true
136: 
137:             hosts_responded = 0
138: 
139:             begin
140:                 Timeout.timeout(options[:timeout]) do
141:                     reqid = sendreq(body, agent, options[:filter])
142: 
143:                     loop do
144:                         resp = receive(reqid)
145: 
146:                         hosts_responded += 1
147: 
148:                         yield(resp.payload)
149: 
150:                         break if (waitfor != 0 && hosts_responded >= waitfor)
151:                     end
152:                 end
153:             rescue Interrupt => e
154:             rescue Timeout::Error => e
155:             end
156: 
157:             stat[:totaltime] = Time.now.to_f - stat[:starttime]
158:             stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
159:             stat[:responses] = hosts_responded
160:             stat[:noresponsefrom] = []
161: 
162:             @stats = stat
163:             return stat
164:         end

Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses

[Source]

    # File lib/mcollective/client.rb, line 40
40:         def sendreq(msg, agent, filter = {})
41:             if msg.is_a?(Message)
42:                 request = msg
43:                 agent = request.agent
44:             else
45:                 request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter})
46:             end
47: 
48:             request.encode!
49: 
50:             Log.debug("Sending request #{request.requestid} to the #{request.agent} agent in collective #{request.collective}")
51: 
52:             unless @subscriptions.include?(agent)
53:                 subscription = Util.make_subscriptions(agent, :reply, collective)
54:                 Log.debug("Subscribing to reply target for agent #{agent}")
55: 
56:                 Util.subscribe(subscription)
57:                 @subscriptions[agent] = 1
58:             end
59: 
60:             request.publish
61: 
62:             request.requestid
63:         end

[Validate]