Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

Methods

Classes and Modules

Class Jabber::Stream::ThreadBlock

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 34
34:     def initialize(threaded = true)
35:       @fd = nil
36:       @status = DISCONNECTED
37:       @xmlcbs = CallbackList::new
38:       @stanzacbs = CallbackList::new
39:       @messagecbs = CallbackList::new
40:       @iqcbs = CallbackList::new
41:       @presencecbs = CallbackList::new
42:       unless threaded
43:         $stderr.puts "Non-threaded mode is currently broken, re-enabling threaded"
44:         threaded = true
45:       end
46:       @threaded = threaded
47:       @stanzaqueue = []
48:       @stanzaqueue_lock = Mutex::new
49:       @exception_block = nil
50:       @threadblocks = []
51: #      @pollCounter = 10
52:       @waiting_thread = nil
53:       @wakeup_thread = nil
54:       @streamid = nil
55:       @features_lock = Mutex.new
56:     end

Public Instance methods

Adds a callback block to process received Iqs

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 518
518:     def add_iq_callback(priority = 0, ref = nil, &block)
519:       @iqcbs.add(priority, ref, block)
520:     end

Adds a callback block to process received Messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 464
464:     def add_message_callback(priority = 0, ref = nil, &block)
465:       @messagecbs.add(priority, ref, block)
466:     end

Adds a callback block to process received Presences

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 500
500:     def add_presence_callback(priority = 0, ref = nil, &block)
501:       @presencecbs.add(priority, ref, block)
502:     end

Adds a callback block to process received Stanzas

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 482
482:     def add_stanza_callback(priority = 0, ref = nil, &block)
483:       @stanzacbs.add(priority, ref, block)
484:     end

Adds a callback block to process received XML messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 446
446:     def add_xml_callback(priority = 0, ref = nil, &block)
447:       @xmlcbs.add(priority, ref, block)
448:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 532
532:     def close
533:       close!
534:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 536
536:     def close!
537:       @parserThread.kill if @parserThread
538: #      @pollThread.kill
539:       @fd.close if @fd and !@fd.closed?
540:       @status = DISCONNECTED
541:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 527
527:     def delete_iq_callback(ref)
528:       @iqcbs.delete(ref)
529:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 472
472:     def delete_message_callback(ref)
473:       @messagecbs.delete(ref)
474:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 508
508:     def delete_presence_callback(ref)
509:       @presencecbs.delete(ref)
510:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 490
490:     def delete_stanza_callback(ref)
491:       @stanzacbs.delete(ref)
492:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 454
454:     def delete_xml_callback(ref)
455:       @xmlcbs.delete(ref)
456:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 150
150:     def is_connected?
151:       return @status == CONNECTED
152:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 158
158:     def is_disconnected?
159:       return @status == DISCONNECTED
160:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception
  • the Jabber::Stream object (self)
  • a symbol where it happened, namely :start, :parser, :sending and :end

[Source]

     # File lib/xmpp4r/stream.rb, line 108
108:     def on_exception(&block)
109:       @exception_block = block
110:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 114
114:     def parse_failure(e)
115:       Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
116: 
117:       # A new thread has to be created because close will cause the thread
118:       # to commit suicide(???)
119:       if @exception_block
120:         # New thread, because close will kill the current thread
121:         Thread.new {
122:           close
123:           @exception_block.call(e, self, :parser)
124:         }
125:       else
126:         puts "Stream#parse_failure was called by XML parser. Dumping " +
127:         "backtrace...\n" + e.exception + "\n"
128:         puts e.backtrace
129:         close
130:         raise
131:       end
132:     end

This method is called by the parser upon receiving </stream:stream>

[Source]

     # File lib/xmpp4r/stream.rb, line 136
136:     def parser_end
137:       if @exception_block
138:         Thread.new {
139:           close
140:           @exception_block.call(nil, self, :close)
141:         }
142:       else
143:         close
144:       end
145:     end

Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.

Currently not working!

[Source]

     # File lib/xmpp4r/stream.rb, line 424
424:     def poll
425:       sleep 10
426:       while true
427:         sleep 2
428: #        @pollCounter = @pollCounter - 1
429: #        if @pollCounter < 0
430: #          begin
431: #            send("  \t  ")
432: #          rescue
433: #            Thread.new {@exception_block.call if @exception_block}
434: #            break
435: #          end
436: #        end
437:       end
438:     end

Process |max| XML stanzas and call listeners for all of them.

max:[Integer] the number of stanzas to process (nil means process

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 281
281:     def process(max = nil)
282:       n = 0
283:       @stanzaqueue_lock.lock
284:       while @stanzaqueue.size > 0 and (max == nil or n < max)
285:         e = @stanzaqueue.shift
286:         @stanzaqueue_lock.unlock
287:         process_one(e)
288:         n += 1
289:         @stanzaqueue_lock.lock
290:       end
291:       @stanzaqueue_lock.unlock
292:       n
293:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

If in threaded mode, a new thread will be spawned for the call to receive_nonthreaded.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 169
169:     def receive(element)
170:       if @threaded
171:         # Don't spawn a new thread here. An implicit feature
172:         # of XMPP is constant order of stanzas.
173:         receive_nonthreaded(element)
174:       else
175:         receive_nonthreaded(element)
176:       end
177:     end

Sends XML data to the socket and (optionally) waits to process received data.

xml:[String] The xml data to send
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 357
357:     def send(xml, &block)
358:       Jabber::debuglog("SENDING:\n#{xml}")
359:       @threadblocks.unshift(ThreadBlock.new(block)) if block
360:       Thread.critical = true # we don't want to be interupted before we stop!
361:       begin
362:         @fd << xml.to_s
363:         @fd.flush
364:       rescue Exception => e
365:         Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
366: 
367:         if @exception_block 
368:           Thread.new { close!; @exception_block.call(e, self, :sending) }
369:         else
370:           puts "Exception caught while sending!"
371:           close!
372:           raise
373:         end
374:       end
375:       Thread.critical = false
376:       # The parser thread might be running this (think of a callback running send())
377:       # If this is the case, we mustn't stop (or we would cause a deadlock)
378:       Thread.stop if block and Thread.current != @parserThread
379:       @pollCounter = 10
380:     end

Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!

Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.

Please read the note about nesting at Stream#send

xml:[XMLStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 395
395:     def send_with_id(xml, &block)
396:       if xml.id.nil?
397:         xml.id = Jabber::IdGenerator.instance.generate_id
398:       end
399: 
400:       error = nil
401:       send(xml) do |received|
402:         if received.kind_of? XMLStanza and received.id == xml.id
403:           if received.type == :error
404:             error = (received.error ? received.error : Error.new)
405:             true
406:           else
407:             yield(received)
408:           end
409:         else
410:           false
411:         end
412:       end
413: 
414:       unless error.nil?
415:         raise ErrorException.new(error)
416:       end
417:     end

Start the XML parser on the fd

[Source]

    # File lib/xmpp4r/stream.rb, line 60
60:     def start(fd)
61:       @stream_mechanisms = []
62:       @stream_features = {}
63: 
64:       @fd = fd
65:       @parser = StreamParser.new(@fd, self)
66:       @parserThread = Thread.new do
67:         begin
68:           @parser.parse
69:         rescue Exception => e
70:           Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
71: 
72:           if @exception_block
73:             Thread.new { close; @exception_block.call(e, self, :start) }
74:           else
75:             puts "Exception caught in Parser thread!"
76:             close
77:             raise
78:           end
79:         end
80:       end
81: #      @pollThread = Thread.new do
82: #        begin
83: #        poll
84: #        rescue
85: #          puts "Exception caught in Poll thread, dumping backtrace and" +
86: #            " exiting...\n" + $!.exception + "\n"
87: #          puts $!.backtrace
88: #          exit
89: #        end
90: #      end
91:       @status = CONNECTED
92:     end

[Source]

    # File lib/xmpp4r/stream.rb, line 94
94:     def stop
95:       @parserThread.kill
96:       @parser = nil
97:     end

Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.

time:[Integer] time to wait in seconds. If nil, wait infinitely.

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 301
301:     def wait_and_process(time = nil)
302:       if time == 0 
303:         return process(1)
304:       end
305:       @stanzaqueue_lock.lock
306:       if @stanzaqueue.size > 0
307:         e = @stanzaqueue.shift
308:         @stanzaqueue_lock.unlock
309:         process_one(e)
310:         return 1
311:       end
312: 
313:       @waiting_thread = Thread.current
314:       @wakeup_thread = Thread.new { sleep time ; @waiting_thread.wakeup if @waiting_thread }
315:       @waiting_thread.stop
316:       @wakeup_thread.kill if @wakeup_thread
317:       @wakeup_thread = nil
318:       @waiting_thread = nil
319: 
320:       @stanzaqueue_lock.lock
321:       if @stanzaqueue.size > 0
322:         e = @stanzaqueue.shift
323:         @stanzaqueue_lock.unlock
324:         process_one(e)
325:         return 1
326:       end
327:       return 0
328:     end

Private Instance methods

Process |element| until it is consumed. Returns element.consumed? element The element to process

[Source]

     # File lib/xmpp4r/stream.rb, line 261
261:     def process_one(stanza)
262:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
263:       return true if @xmlcbs.process(stanza)
264:       return true if @stanzacbs.process(stanza)
265:       case stanza
266:       when Message
267:         return true if @messagecbs.process(stanza)
268:       when Iq
269:         return true if @iqcbs.process(stanza)
270:       when Presence
271:         return true if @presencecbs.process(stanza)
272:       end
273:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 179
179:     def receive_nonthreaded(element)
180:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
181:       case element.prefix
182:       when 'stream'
183:         case element.name
184:           when 'stream'
185:             stanza = element
186:             @streamid = element.attributes['id']
187:             unless element.attributes['version']  # isn't XMPP compliant, so
188:               Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
189:               @features_lock.unlock               # don't wait for <stream:features/>
190:             end
191:           when 'features'
192:             stanza = element
193:             element.each { |e|
194:               if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
195:                 e.each_element('mechanism') { |mech|
196:                   @stream_mechanisms.push(mech.text)
197:                 }
198:               else
199:                 @stream_features[e.name] = e.namespace
200:               end
201:             }
202:             Jabber::debuglog("FEATURES: received")
203:             @features_lock.unlock
204:           else
205:             stanza = element
206:         end
207:       else
208:         case element.name
209:           when 'message'
210:             stanza = Message::import(element)
211:           when 'iq'
212:             stanza = Iq::import(element)
213:           when 'presence'
214:             stanza = Presence::import(element)
215:           else
216:             stanza = element
217:         end
218:       end
219: 
220:       # Iterate through blocked threads (= waiting for an answer)
221:       #
222:       # We're dup'ping the @threadblocks here, so that we won't end up in an
223:       # endless loop if Stream#send is being nested. That means, the nested
224:       # threadblock won't receive the stanza currently processed, but the next
225:       # one.
226:       threadblocks = @threadblocks.dup
227:       threadblocks.each { |threadblock|
228:         exception = nil
229:         r = false
230:         begin
231:           r = threadblock.call(stanza)
232:         rescue Exception => e
233:           exception = e
234:         end
235: 
236:         if r == true
237:           @threadblocks.delete(threadblock)
238:           threadblock.wakeup
239:           return
240:         elsif exception
241:           @threadblocks.delete(threadblock)
242:           threadblock.raise(exception)
243:         end
244:       }
245: 
246:       if @threaded
247:         process_one(stanza)
248:       else
249:         # stanzaqueue will be read when the user call process
250:         @stanzaqueue_lock.lock
251:         @stanzaqueue.push(stanza)
252:         @stanzaqueue_lock.unlock
253:         @waiting_thread.wakeup if @waiting_thread
254:       end
255:     end

[Validate]