Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

Methods

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 34
34:     def initialize(threaded = true)
35:       @fd = nil
36:       @status = DISCONNECTED
37:       @xmlcbs = CallbackList::new
38:       @stanzacbs = CallbackList::new
39:       @messagecbs = CallbackList::new
40:       @iqcbs = CallbackList::new
41:       @presencecbs = CallbackList::new
42:       @threaded = threaded
43:       @StanzaQueue = []
44:       @StanzaQueueMutex = Mutex::new
45:       @exception_block = nil
46:       @threadBlocks = {}
47: #      @pollCounter = 10
48:       @waitingThread = nil
49:       @wakeupThread = nil
50:       @streamid = nil
51:     end

Public Instance methods

Adds a callback block/proc to process received Iqs

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 410
410:     def add_iq_callback(priority = 0, ref = nil, proc=nil, &block)
411:       block = proc if proc
412:       @iqcbs.add(priority, ref, block)
413:     end

Adds a callback block/proc to process received Messages

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 350
350:     def add_message_callback(priority = 0, ref = nil, proc=nil, &block)
351:       block = proc if proc
352:       @messagecbs.add(priority, ref, block)
353:     end

Adds a callback block/proc to process received Presences

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 390
390:     def add_presence_callback(priority = 0, ref = nil, proc=nil, &block)
391:       block = proc if proc
392:       @presencecbs.add(priority, ref, block)
393:     end

Adds a callback block/proc to process received Stanzas

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 370
370:     def add_stanza_callback(priority = 0, ref = nil, proc=nil, &block)
371:       block = proc if proc
372:       @stanzacbs.add(priority, ref, block)
373:     end

Adds a callback block/proc to process received XML messages

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 330
330:     def add_xml_callback(priority = 0, ref = nil, proc=nil, &block)
331:       block = proc if proc
332:       @xmlcbs.add(priority, ref, block)
333:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 425
425:     def close
426:       @parserThread.kill if @parserThread
427: #      @pollThread.kill
428:       @fd.close if @fd
429:       @status = DISCONNECTED
430:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 420
420:     def delete_iq_callback(ref)
421:       @iqcbs.delete(ref)
422:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 359
359:     def delete_message_callback(ref)
360:       @messagecbs.delete(ref)
361:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 399
399:     def delete_presence_callback(ref)
400:       @presencecbs.delete(ref)
401:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 379
379:     def delete_stanza_callback(ref)
380:       @stanzacbs.delete(ref)
381:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 339
339:     def delete_xml_callback(ref)
340:       @xmlcbs.delete(ref)
341:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 111
111:     def is_connected?
112:       return @status == CONNECTED
113:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 119
119:     def is_disconnected?
120:       return @status == DISCONNECTED
121:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

[Source]

    # File lib/xmpp4r/stream.rb, line 88
88:     def on_exception(&block)
89:       @exception_block = block
90:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 94
 94:     def parse_failure
 95:       # A new thread has to be created because close will cause the thread
 96:       # to commit suicide
 97:       if @exception_block
 98:         Thread.new { @exception_block.call($!, self, :parser) }
 99:       else
100:         puts "Stream#parse_failure was called by XML parser. Dumping " +
101:         "backtrace...\n" + $!.exception + "\n"
102:         puts $!.backtrace
103:         close
104:         raise
105:       end
106:     end

Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.

Currently not working!

[Source]

     # File lib/xmpp4r/stream.rb, line 307
307:     def poll
308:       sleep 10
309:       while true
310:         sleep 2
311: #        @pollCounter = @pollCounter - 1
312: #        if @pollCounter < 0
313: #          begin
314: #            send("  \t  ")
315: #          rescue
316: #            Thread.new {@exception_block.call if @exception_block}
317: #            break
318: #          end
319: #        end
320:       end
321:     end

Process |max| XML stanzas and call listeners for all of them.

max:[Integer] the number of stanzas to process (nil means process

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 187
187:     def process(max = nil)
188:       n = 0
189:       @StanzaQueueMutex.lock
190:       while @StanzaQueue.size > 0 and (max == nil or n < max)
191:         e = @StanzaQueue.shift
192:         @StanzaQueueMutex.unlock
193:         process_one(e)
194:         n += 1
195:         @StanzaQueueMutex.lock
196:       end
197:       @StanzaQueueMutex.unlock
198:       n
199:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 128
128:     def receive(element)
129:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
130:       case element.name
131:       when 'stream'
132:         stanza = element
133:         i = element.attribute("id")
134:         @streamid = i.value if i
135:       when 'message'
136:         stanza = Message::import(element)
137:       when 'iq'
138:         stanza = Iq::import(element)
139:       when 'presence'
140:         stanza = Presence::import(element)
141:       else
142:         stanza = element
143:       end
144:       # Iterate through blocked theads (= waiting for an answer)
145:       @threadBlocks.each { |thread, proc|
146:         r = proc.call(stanza)
147:         if r == true
148:           @threadBlocks.delete(thread)
149:           thread.wakeup if thread.alive?
150:           return
151:         end
152:       }
153:       if @threaded
154:         process_one(stanza)
155:       else
156:         # StanzaQueue will be read when the user call process
157:         @StanzaQueueMutex.lock
158:         @StanzaQueue.push(stanza)
159:         @StanzaQueueMutex.unlock
160:         @waitingThread.wakeup if @waitingThread
161:       end
162:     end

Sends XML data to the socket and (optionally) waits to process received data.

xml:[String] The xml data to send
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 243
243:     def send(xml, proc=nil, &block)
244:       Jabber::debuglog("SENDING:\n#{ xml.kind_of?(String) ? xml : xml.to_s }")
245:       xml = xml.to_s if not xml.kind_of? String
246:       block = proc if proc
247:       @threadBlocks[Thread.current]=block if block
248:       Thread.critical = true # we don't want to be interupted before we stop!
249:       begin
250:         @fd << xml
251:         @fd.flush
252:       rescue
253:         if @exception_block 
254:           @exception_block.call($!, self, :sending)
255:         else
256:           puts "Exception caught while sending!"
257:           raise
258:         end
259:       end
260:       Thread.critical = false
261:       # The parser thread might be running this (think of a callback running send())
262:       # If this is the case, we mustn't stop (or we would cause a deadlock)
263:       Thread.stop if block and Thread.current != @parserThread
264:       @pollCounter = 10
265:     end

Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!

Be aware that if a stanza with type=’error’ is received the function does not yield but raises an ErrorException with the corresponding error element.

xml:[XMLStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 278
278:     def send_with_id(xml, &block)
279:       if xml.id.nil?
280:         xml.id = Jabber::IdGenerator.instance.generate_id
281:       end
282: 
283:       error = nil
284:       send(xml) do |received|
285:         if received.id == xml.id
286:           if received.type == :error
287:             error = received.error
288:             true
289:           else
290:             yield(received)
291:           end
292:         else
293:           false
294:         end
295:       end
296: 
297:       unless error.nil?
298:         raise ErrorException.new(error)
299:       end
300:     end

Start the XML parser on the fd

[Source]

    # File lib/xmpp4r/stream.rb, line 55
55:     def start(fd)
56:       @fd = fd
57:       @parser = StreamParser.new(@fd, self)
58:       @parserThread = Thread.new do
59:         begin
60:           @parser.parse
61:         rescue
62:           if @exception_block
63:             Thread.new { @exception_block.call($!, self, :start) }
64:           else
65:             puts "Exception caught in Parser thread!"
66:             raise
67:           end
68:         end
69:       end
70: #      @pollThread = Thread.new do
71: #        begin
72: #        poll
73: #        rescue
74: #          puts "Exception caught in Poll thread, dumping backtrace and" +
75: #            " exiting...\n" + $!.exception + "\n"
76: #          puts $!.backtrace
77: #          exit
78: #        end
79: #      end
80:       @status = CONNECTED
81:     end

Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.

time:[Integer] time to wait in seconds. If nil, wait infinitely.

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 207
207:     def wait_and_process(time = nil)
208:       if time == 0 
209:         return process(1)
210:       end
211:       @StanzaQueueMutex.lock
212:       if @StanzaQueue.size > 0
213:         e = @StanzaQueue.shift
214:         @StanzaQueueMutex.unlock
215:         process_one(e)
216:         return 1
217:       end
218: 
219:       @waitingThread = Thread.current
220:       @wakeupThread = Thread.new { sleep time ; @waitingThread.wakeup if @waitingThread }
221:       @waitingThread.stop
222:       @wakeupThread.kill if @wakeupThread
223:       @wakeupThread = nil
224:       @waitingThread = nil
225: 
226:       @StanzaQueueMutex.lock
227:       if @StanzaQueue.size > 0
228:         e = @StanzaQueue.shift
229:         @StanzaQueueMutex.unlock
230:         process_one(e)
231:         return 1
232:       end
233:       return 0
234:     end

Private Instance methods

Process |element| until it is consumed. Returns element.consumed? element The element to process

[Source]

     # File lib/xmpp4r/stream.rb, line 167
167:     def process_one(stanza)
168:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
169:       return true if @xmlcbs.process(stanza)
170:       return true if @stanzacbs.process(stanza)
171:       case stanza
172:       when Message
173:         return true if @messagecbs.process(stanza)
174:       when Iq
175:         return true if @iqcbs.process(stanza)
176:       when Presence
177:         return true if @presencecbs.process(stanza)
178:       end
179:     end

[Validate]