Class Jabber::Bytestreams::IBB
In: lib/xmpp4r/bytestreams/helper/ibb/base.rb
Parent: Object

In-Band Bytestreams (JEP-0047) implementation

Don‘t use directly, use IBBInitiator and IBBTarget

In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.

Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.

Methods

activate   active?   close   deactivate   flush   new   read   send_data   write  

Constants

NS_IBB = 'http://jabber.org/protocol/ibb'

Public Class methods

Create a new bytestream

Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 26
26:       def initialize(stream, session_id, my_jid, peer_jid)
27:         @stream = stream
28:         @session_id = session_id
29:         @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid)
30:         @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid)
31: 
32:         @active = false
33:         @seq_send = 0
34:         @seq_recv = 0
35:         @queue = []
36:         @queue_lock = Mutex.new
37:         @pending = Mutex.new
38:         @pending.lock
39:         @sendbuf = ''
40:         @sendbuf_lock = Mutex.new
41: 
42:         @block_size = 4096  # Recommended by JEP0047
43:       end

Public Instance methods

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 45
45:       def active?
46:         @active
47:       end

Close the stream

Waits for acknowledge from peer, may throw ErrorException

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 125
125:       def close
126:         if active?
127:           flush
128:           deactivate
129: 
130:           iq = Iq.new(:set, @peer_jid)
131:           close = iq.add REXML::Element.new('close')
132:           close.add_namespace IBB::NS_IBB
133:           close.attributes['sid'] = @session_id
134: 
135:           @stream.send_with_id(iq) { |answer|
136:             answer.type == :result
137:           }
138:         end
139:       end

Empty the send-buffer by sending remaining data

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 69
69:       def flush
70:         @sendbuf_lock.synchronize {
71:           while @sendbuf.size > 0
72:             send_data(@sendbuf[0..@block_size-1])
73:             @sendbuf = @sendbuf[@block_size..-1].to_s
74:           end
75:         }
76:       end

Receive data

Will wait until the Message with the next sequence number is in the stanza queue.

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 83
 83:       def read
 84:         if active?
 85:           res = nil
 86: 
 87:           while res.nil?
 88:             @queue_lock.synchronize {
 89:               @queue.each { |item|
 90:                 # Find next data
 91:                 if item.type == :data and item.seq == @seq_recv.to_s
 92:                   res = item
 93:                   break
 94:                 # No data? Find close
 95:                 elsif item.type == :close and res.nil?
 96:                   res = item
 97:                 end
 98:               }
 99: 
100:               @queue.delete_if { |item| item == res }
101:             }
102: 
103:             # No data? Wait for next to arrive...
104:             @pending.lock unless res
105:           end
106: 
107:           if res.type == :data
108:             @seq_recv += 1
109:             @seq_recv = 0 if @seq_recv > 65535
110:             res.data
111:           elsif res.type == :close
112:             deactivate
113:             nil # Closed
114:           end
115:         else
116:           nil
117:         end
118:       end

Send data

Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.

buf:[String]

[Source]

    # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 56
56:       def write(buf)
57:         @sendbuf_lock.synchronize {
58:           @sendbuf += buf
59: 
60:           while @sendbuf.size >= @block_size
61:             send_data(@sendbuf[0..@block_size-1])
62:             @sendbuf = @sendbuf[@block_size..-1].to_s
63:           end
64:         }
65:       end

Private Instance methods

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 179
179:       def activate
180:         unless active?
181:           @stream.add_message_callback(200, self) { |msg|
182:             data = msg.first_element('data')
183:             if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id
184:               if msg.type == nil
185:                 @queue_lock.synchronize {
186:                   @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s)
187:                   @pending.unlock
188:                 }
189:               elsif msg.type == :error
190:                 @queue_lock.synchronize {
191:                   @queue << IBBQueueItem.new(:close)
192:                   @pending.unlock
193:                 }
194:               end
195:               true
196:             else
197:               false
198:             end
199:           }
200: 
201:           @stream.add_iq_callback(200, self) { |iq|
202:             close = iq.first_element('close')
203:             if iq.type == :set and close and close.attributes['sid'] == @session_id
204:               answer = iq.answer(false)
205:               answer.type = :result
206:               @stream.send(answer)
207: 
208:               @queue_lock.synchronize {
209:                 @queue << IBBQueueItem.new(:close)
210:                 @pending.unlock
211:               }
212:               true
213:             else
214:               false
215:             end
216:           }
217: 
218:           @active = true
219:         end
220:       end

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 222
222:       def deactivate
223:         if active?
224:           @stream.delete_message_callback(self)
225:           @stream.delete_iq_callback(self)
226: 
227:           @active = false
228:         end
229:       end

Send data directly

data:[String]

[Source]

     # File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 146
146:       def send_data(databuf)
147:         if active?
148:           msg = Message.new
149:           msg.from = @my_jid
150:           msg.to = @peer_jid
151:           
152:           data = msg.add REXML::Element.new('data')
153:           data.add_namespace NS_IBB
154:           data.attributes['sid'] = @session_id
155:           data.attributes['seq'] = @seq_send.to_s
156:           data.text = Base64::encode64 databuf
157: 
158:           # TODO: Implement AMP correctly
159:           amp = msg.add REXML::Element.new('amp')
160:           amp.add_namespace 'http://jabber.org/protocol/amp'
161:           deliver_at = amp.add REXML::Element.new('rule')
162:           deliver_at.attributes['condition'] = 'deliver-at'
163:           deliver_at.attributes['value'] = 'stored'
164:           deliver_at.attributes['action'] = 'error'
165:           match_resource = amp.add REXML::Element.new('rule')
166:           match_resource.attributes['condition'] = 'match-resource'
167:           match_resource.attributes['value'] = 'exact'
168:           match_resource.attributes['action'] = 'error'
169:    
170:           @stream.send(msg)
171: 
172:           @seq_send += 1
173:           @seq_send = 0 if @seq_send > 65535
174:         else
175:           raise 'Attempt to send data when not activated'
176:         end
177:       end

[Validate]