Class | MCollective::Client |
In: |
lib/mcollective/client.rb
|
Parent: | Object |
Helpers for writing clients that can talk to agents, do discovery and so forth
options | [RW] | |
stats | [RW] |
# File lib/mcollective/client.rb, line 8 8: def initialize(configfile) 9: @config = Config.instance 10: @config.loadconfig(configfile) unless @config.configured 11: 12: @connection = PluginManager["connector_plugin"] 13: @security = PluginManager["security_plugin"] 14: 15: @security.initiated_by = :client 16: @options = nil 17: @subscriptions = {} 18: 19: @connection.connect 20: end
Returns the configured main collective if no specific collective is specified as options
# File lib/mcollective/client.rb, line 24 24: def collective 25: if @options[:collective].nil? 26: @config.main_collective 27: else 28: @options[:collective] 29: end 30: end
Disconnects cleanly from the middleware
# File lib/mcollective/client.rb, line 33 33: def disconnect 34: Log.debug("Disconnecting from the middleware") 35: @connection.disconnect 36: end
Performs a discovery of nodes matching the filter passed returns an array of nodes
# File lib/mcollective/client.rb, line 96 96: def discover(filter, timeout) 97: begin 98: hosts = [] 99: Timeout.timeout(timeout) do 100: reqid = sendreq("ping", "discovery", filter) 101: Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}") 102: 103: loop do 104: reply = receive(reqid) 105: Log.debug("Got discovery reply from #{reply.payload[:senderid]}") 106: hosts << reply.payload[:senderid] 107: end 108: end 109: rescue Timeout::Error => e 110: hosts.sort 111: rescue Exception => e 112: raise 113: end 114: end
Performs a discovery and then send a request, performs the passed block for each response
times = discovered_req("status", "mcollectived", options, client) {|resp| pp resp }
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 174 174: def discovered_req(body, agent, options=false) 175: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 176: 177: options = @options unless options 178: 179: STDOUT.sync = true 180: 181: print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ") 182: 183: begin 184: discovered_hosts = discover(options[:filter], options[:disctimeout]) 185: discovered = discovered_hosts.size 186: hosts_responded = [] 187: hosts_not_responded = discovered_hosts 188: 189: stat[:discoverytime] = Time.now.to_f - stat[:starttime] 190: 191: puts("#{discovered}\n\n") 192: rescue Interrupt 193: puts("Discovery interrupted.") 194: exit! 195: end 196: 197: raise("No matching clients found") if discovered == 0 198: 199: begin 200: Timeout.timeout(options[:timeout]) do 201: reqid = sendreq(body, agent, options[:filter]) 202: 203: (1..discovered).each do |c| 204: resp = receive(reqid) 205: 206: hosts_responded << resp.payload[:senderid] 207: hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid]) 208: 209: yield(resp.payload) 210: end 211: end 212: rescue Interrupt => e 213: rescue Timeout::Error => e 214: end 215: 216: stat[:totaltime] = Time.now.to_f - stat[:starttime] 217: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 218: stat[:responses] = hosts_responded.size 219: stat[:responsesfrom] = hosts_responded 220: stat[:noresponsefrom] = hosts_not_responded 221: stat[:discovered] = discovered 222: 223: @stats = stat 224: return stat 225: end
Prints out the stats returns from req and discovered_req in a nice way
# File lib/mcollective/client.rb, line 228 228: def display_stats(stats, options=false, caption="stomp call summary") 229: options = @options unless options 230: 231: if options[:verbose] 232: puts("\n---- #{caption} ----") 233: 234: if stats[:discovered] 235: puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") 236: else 237: puts(" Nodes: #{stats[:responses]}") 238: end 239: 240: printf(" Start Time: %s\n", Time.at(stats[:starttime])) 241: printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) 242: printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) 243: printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) 244: 245: else 246: if stats[:discovered] 247: printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) 248: else 249: printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) 250: end 251: end 252: 253: if stats[:noresponsefrom].size > 0 254: puts("\nNo response from:\n") 255: 256: stats[:noresponsefrom].each do |c| 257: puts if c % 4 == 1 258: printf("%30s", c) 259: end 260: 261: puts 262: end 263: end
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
# File lib/mcollective/client.rb, line 71 71: def receive(requestid = nil) 72: reply = nil 73: 74: begin 75: reply = @connection.receive 76: reply.type = :reply 77: 78: reply.decode! 79: 80: reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") 81: 82: raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid 83: rescue SecurityValidationFailed => e 84: Log.warn("Ignoring a message that did not pass security validations") 85: retry 86: rescue MsgDoesNotMatchRequestID => e 87: Log.debug("Ignoring a message for some other client") 88: retry 89: end 90: 91: reply 92: end
Send a request, performs the passed block for each response
times = req("status", "mcollectived", options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 124 124: def req(body, agent=nil, options=false, waitfor=0) 125: if body.is_a?(Message) 126: agent = body.agent 127: options = body.options 128: waitfor = body.discovered_hosts.size || 0 129: end 130: 131: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 132: 133: options = @options unless options 134: 135: STDOUT.sync = true 136: 137: hosts_responded = 0 138: 139: begin 140: Timeout.timeout(options[:timeout]) do 141: reqid = sendreq(body, agent, options[:filter]) 142: 143: loop do 144: resp = receive(reqid) 145: 146: hosts_responded += 1 147: 148: yield(resp.payload) 149: 150: break if (waitfor != 0 && hosts_responded >= waitfor) 151: end 152: end 153: rescue Interrupt => e 154: rescue Timeout::Error => e 155: end 156: 157: stat[:totaltime] = Time.now.to_f - stat[:starttime] 158: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 159: stat[:responses] = hosts_responded 160: stat[:noresponsefrom] = [] 161: 162: @stats = stat 163: return stat 164: end
Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses
# File lib/mcollective/client.rb, line 40 40: def sendreq(msg, agent, filter = {}) 41: if msg.is_a?(Message) 42: request = msg 43: agent = request.agent 44: else 45: request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter}) 46: end 47: 48: request.encode! 49: 50: Log.debug("Sending request #{request.requestid} to the #{request.agent} agent in collective #{request.collective}") 51: 52: unless @subscriptions.include?(agent) 53: subscription = Util.make_subscriptions(agent, :reply, collective) 54: Log.debug("Subscribing to reply target for agent #{agent}") 55: 56: Util.subscribe(subscription) 57: @subscriptions[agent] = 1 58: end 59: 60: request.publish 61: 62: request.requestid 63: end