Class | Jabber::Stream |
In: |
lib/xmpp4r/stream.rb
|
Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
DISCONNECTED | = | 1 |
CONNECTED | = | 2 |
fd | [R] | file descriptor used |
status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 34 34: def initialize(threaded = true) 35: @fd = nil 36: @status = DISCONNECTED 37: @xmlcbs = CallbackList::new 38: @stanzacbs = CallbackList::new 39: @messagecbs = CallbackList::new 40: @iqcbs = CallbackList::new 41: @presencecbs = CallbackList::new 42: unless threaded 43: $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded" 44: threaded = true 45: end 46: @threaded = threaded 47: @stanzaqueue = [] 48: @stanzaqueue_lock = Mutex::new 49: @exception_block = nil 50: @threadblocks = [] 51: # @pollCounter = 10 52: @waiting_thread = nil 53: @wakeup_thread = nil 54: @streamid = nil 55: @features_lock = Mutex.new 56: end
Adds a callback block to process received Iqs
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 518 518: def add_iq_callback(priority = 0, ref = nil, &block) 519: @iqcbs.add(priority, ref, block) 520: end
Adds a callback block to process received Messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 464 464: def add_message_callback(priority = 0, ref = nil, &block) 465: @messagecbs.add(priority, ref, block) 466: end
Adds a callback block to process received Presences
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 500 500: def add_presence_callback(priority = 0, ref = nil, &block) 501: @presencecbs.add(priority, ref, block) 502: end
Adds a callback block to process received Stanzas
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 482 482: def add_stanza_callback(priority = 0, ref = nil, &block) 483: @stanzacbs.add(priority, ref, block) 484: end
Adds a callback block to process received XML messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 446 446: def add_xml_callback(priority = 0, ref = nil, &block) 447: @xmlcbs.add(priority, ref, block) 448: end
# File lib/xmpp4r/stream.rb, line 536 536: def close! 537: @parserThread.kill if @parserThread 538: # @pollThread.kill 539: @fd.close if @fd and !@fd.closed? 540: @status = DISCONNECTED 541: end
Delete a Stanza callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 490 490: def delete_stanza_callback(ref) 491: @stanzacbs.delete(ref) 492: end
Delete an XML-messages callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 454 454: def delete_xml_callback(ref) 455: @xmlcbs.delete(ref) 456: end
Returns if this connection is connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 150 150: def is_connected? 151: return @status == CONNECTED 152: end
Returns if this connection is NOT connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 158 158: def is_disconnected? 159: return @status == DISCONNECTED 160: end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
# File lib/xmpp4r/stream.rb, line 108 108: def on_exception(&block) 109: @exception_block = block 110: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 114 114: def parse_failure(e) 115: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 116: 117: # A new thread has to be created because close will cause the thread 118: # to commit suicide(???) 119: if @exception_block 120: # New thread, because close will kill the current thread 121: Thread.new { 122: close 123: @exception_block.call(e, self, :parser) 124: } 125: else 126: puts "Stream#parse_failure was called by XML parser. Dumping " + 127: "backtrace...\n" + e.exception + "\n" 128: puts e.backtrace 129: close 130: raise 131: end 132: end
This method is called by the parser upon receiving </stream:stream>
# File lib/xmpp4r/stream.rb, line 136 136: def parser_end 137: if @exception_block 138: Thread.new { 139: close 140: @exception_block.call(nil, self, :close) 141: } 142: else 143: close 144: end 145: end
Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.
Currently not working!
# File lib/xmpp4r/stream.rb, line 424 424: def poll 425: sleep 10 426: while true 427: sleep 2 428: # @pollCounter = @pollCounter - 1 429: # if @pollCounter < 0 430: # begin 431: # send(" \t ") 432: # rescue 433: # Thread.new {@exception_block.call if @exception_block} 434: # break 435: # end 436: # end 437: end 438: end
Process |max| XML stanzas and call listeners for all of them.
max: | [Integer] the number of stanzas to process (nil means process |
all available)
# File lib/xmpp4r/stream.rb, line 281 281: def process(max = nil) 282: n = 0 283: @stanzaqueue_lock.lock 284: while @stanzaqueue.size > 0 and (max == nil or n < max) 285: e = @stanzaqueue.shift 286: @stanzaqueue_lock.unlock 287: process_one(e) 288: n += 1 289: @stanzaqueue_lock.lock 290: end 291: @stanzaqueue_lock.unlock 292: n 293: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.
element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 169 169: def receive(element) 170: if @threaded 171: # Don't spawn a new thread here. An implicit feature 172: # of XMPP is constant order of stanzas. 173: receive_nonthreaded(element) 174: else 175: receive_nonthreaded(element) 176: end 177: end
Sends XML data to the socket and (optionally) waits to process received data.
xml: | [String] The xml data to send |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 357 357: def send(xml, &block) 358: Jabber::debuglog("SENDING:\n#{xml}") 359: @threadblocks.unshift(ThreadBlock.new(block)) if block 360: Thread.critical = true # we don't want to be interupted before we stop! 361: begin 362: @fd << xml.to_s 363: @fd.flush 364: rescue Exception => e 365: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 366: 367: if @exception_block 368: Thread.new { close!; @exception_block.call(e, self, :sending) } 369: else 370: puts "Exception caught while sending!" 371: close! 372: raise 373: end 374: end 375: Thread.critical = false 376: # The parser thread might be running this (think of a callback running send()) 377: # If this is the case, we mustn't stop (or we would cause a deadlock) 378: Thread.stop if block and Thread.current != @parserThread 379: @pollCounter = 10 380: end
Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!
Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.
Please read the note about nesting at Stream#send
xml: | [XMLStanza] |
# File lib/xmpp4r/stream.rb, line 395 395: def send_with_id(xml, &block) 396: if xml.id.nil? 397: xml.id = Jabber::IdGenerator.instance.generate_id 398: end 399: 400: error = nil 401: send(xml) do |received| 402: if received.kind_of? XMLStanza and received.id == xml.id 403: if received.type == :error 404: error = (received.error ? received.error : Error.new) 405: true 406: else 407: yield(received) 408: end 409: else 410: false 411: end 412: end 413: 414: unless error.nil? 415: raise ErrorException.new(error) 416: end 417: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 60 60: def start(fd) 61: @stream_mechanisms = [] 62: @stream_features = {} 63: 64: @fd = fd 65: @parser = StreamParser.new(@fd, self) 66: @parserThread = Thread.new do 67: begin 68: @parser.parse 69: rescue Exception => e 70: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 71: 72: if @exception_block 73: Thread.new { close; @exception_block.call(e, self, :start) } 74: else 75: puts "Exception caught in Parser thread!" 76: close 77: raise 78: end 79: end 80: end 81: # @pollThread = Thread.new do 82: # begin 83: # poll 84: # rescue 85: # puts "Exception caught in Poll thread, dumping backtrace and" + 86: # " exiting...\n" + $!.exception + "\n" 87: # puts $!.backtrace 88: # exit 89: # end 90: # end 91: @status = CONNECTED 92: end
Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.
time: | [Integer] time to wait in seconds. If nil, wait infinitely. |
all available)
# File lib/xmpp4r/stream.rb, line 301 301: def wait_and_process(time = nil) 302: if time == 0 303: return process(1) 304: end 305: @stanzaqueue_lock.lock 306: if @stanzaqueue.size > 0 307: e = @stanzaqueue.shift 308: @stanzaqueue_lock.unlock 309: process_one(e) 310: return 1 311: end 312: 313: @waiting_thread = Thread.current 314: @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread } 315: @waiting_thread.stop 316: @wakeup_thread.kill if @wakeup_thread 317: @wakeup_thread = nil 318: @waiting_thread = nil 319: 320: @stanzaqueue_lock.lock 321: if @stanzaqueue.size > 0 322: e = @stanzaqueue.shift 323: @stanzaqueue_lock.unlock 324: process_one(e) 325: return 1 326: end 327: return 0 328: end
Process |element| until it is consumed. Returns element.consumed? element The element to process
# File lib/xmpp4r/stream.rb, line 261 261: def process_one(stanza) 262: Jabber::debuglog("PROCESSING:\n#{stanza.to_s}") 263: return true if @xmlcbs.process(stanza) 264: return true if @stanzacbs.process(stanza) 265: case stanza 266: when Message 267: return true if @messagecbs.process(stanza) 268: when Iq 269: return true if @iqcbs.process(stanza) 270: when Presence 271: return true if @presencecbs.process(stanza) 272: end 273: end
# File lib/xmpp4r/stream.rb, line 179 179: def receive_nonthreaded(element) 180: Jabber::debuglog("RECEIVED:\n#{element.to_s}") 181: case element.prefix 182: when 'stream' 183: case element.name 184: when 'stream' 185: stanza = element 186: @streamid = element.attributes['id'] 187: unless element.attributes['version'] # isn't XMPP compliant, so 188: Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") 189: @features_lock.unlock # don't wait for <stream:features/> 190: end 191: when 'features' 192: stanza = element 193: element.each { |e| 194: if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' 195: e.each_element('mechanism') { |mech| 196: @stream_mechanisms.push(mech.text) 197: } 198: else 199: @stream_features[e.name] = e.namespace 200: end 201: } 202: Jabber::debuglog("FEATURES: received") 203: @features_lock.unlock 204: else 205: stanza = element 206: end 207: else 208: case element.name 209: when 'message' 210: stanza = Message::import(element) 211: when 'iq' 212: stanza = Iq::import(element) 213: when 'presence' 214: stanza = Presence::import(element) 215: else 216: stanza = element 217: end 218: end 219: 220: # Iterate through blocked threads (= waiting for an answer) 221: # 222: # We're dup'ping the @threadblocks here, so that we won't end up in an 223: # endless loop if Stream#send is being nested. That means, the nested 224: # threadblock won't receive the stanza currently processed, but the next 225: # one. 226: threadblocks = @threadblocks.dup 227: threadblocks.each { |threadblock| 228: exception = nil 229: r = false 230: begin 231: r = threadblock.call(stanza) 232: rescue Exception => e 233: exception = e 234: end 235: 236: if r == true 237: @threadblocks.delete(threadblock) 238: threadblock.wakeup 239: return 240: elsif exception 241: @threadblocks.delete(threadblock) 242: threadblock.raise(exception) 243: end 244: } 245: 246: if @threaded 247: process_one(stanza) 248: else 249: # stanzaqueue will be read when the user call process 250: @stanzaqueue_lock.lock 251: @stanzaqueue.push(stanza) 252: @stanzaqueue_lock.unlock 253: @waiting_thread.wakeup if @waiting_thread 254: end 255: end