Class | Jabber::Bytestreams::IBB |
In: |
lib/xmpp4r/bytestreams/helper/ibb/base.rb
|
Parent: | Object |
In-Band Bytestreams (JEP-0047) implementation
Don‘t use directly, use IBBInitiator and IBBTarget
In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.
Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.
NS_IBB | = | 'http://jabber.org/protocol/ibb' |
Create a new bytestream
Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 30 30: def initialize(stream, session_id, my_jid, peer_jid) 31: @stream = stream 32: @session_id = session_id 33: @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid) 34: @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid) 35: 36: @active = false 37: @seq_send = 0 38: @seq_recv = 0 39: @queue = [] 40: @queue_lock = Mutex.new 41: @pending = Mutex.new 42: @pending.lock 43: @sendbuf = '' 44: @sendbuf_lock = Mutex.new 45: 46: @block_size = 4096 # Recommended by JEP0047 47: end
Close the stream
Waits for acknowledge from peer, may throw ErrorException
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 129 129: def close 130: if active? 131: flush 132: deactivate 133: 134: iq = Iq.new(:set, @peer_jid) 135: close = iq.add REXML::Element.new('close') 136: close.add_namespace IBB::NS_IBB 137: close.attributes['sid'] = @session_id 138: 139: @stream.send_with_id(iq) { |answer| 140: answer.type == :result 141: } 142: end 143: end
Empty the send-buffer by sending remaining data
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 73 73: def flush 74: @sendbuf_lock.synchronize { 75: while @sendbuf.size > 0 76: send_data(@sendbuf[0..@block_size-1]) 77: @sendbuf = @sendbuf[@block_size..-1].to_s 78: end 79: } 80: end
Receive data
Will wait until the Message with the next sequence number is in the stanza queue.
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 87 87: def read 88: if active? 89: res = nil 90: 91: while res.nil? 92: @queue_lock.synchronize { 93: @queue.each { |item| 94: # Find next data 95: if item.type == :data and item.seq == @seq_recv.to_s 96: res = item 97: break 98: # No data? Find close 99: elsif item.type == :close and res.nil? 100: res = item 101: end 102: } 103: 104: @queue.delete_if { |item| item == res } 105: } 106: 107: # No data? Wait for next to arrive... 108: @pending.lock unless res 109: end 110: 111: if res.type == :data 112: @seq_recv += 1 113: @seq_recv = 0 if @seq_recv > 65535 114: res.data 115: elsif res.type == :close 116: deactivate 117: nil # Closed 118: end 119: else 120: nil 121: end 122: end
Send data
Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.
buf: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 60 60: def write(buf) 61: @sendbuf_lock.synchronize { 62: @sendbuf += buf 63: 64: while @sendbuf.size >= @block_size 65: send_data(@sendbuf[0..@block_size-1]) 66: @sendbuf = @sendbuf[@block_size..-1].to_s 67: end 68: } 69: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 183 183: def activate 184: unless active? 185: @stream.add_message_callback(200, self) { |msg| 186: data = msg.first_element('data') 187: if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id 188: if msg.type == nil 189: @queue_lock.synchronize { 190: @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s) 191: @pending.unlock 192: } 193: elsif msg.type == :error 194: @queue_lock.synchronize { 195: @queue << IBBQueueItem.new(:close) 196: @pending.unlock 197: } 198: end 199: true 200: else 201: false 202: end 203: } 204: 205: @stream.add_iq_callback(200, self) { |iq| 206: close = iq.first_element('close') 207: if iq.type == :set and close and close.attributes['sid'] == @session_id 208: answer = iq.answer(false) 209: answer.type = :result 210: @stream.send(answer) 211: 212: @queue_lock.synchronize { 213: @queue << IBBQueueItem.new(:close) 214: @pending.unlock 215: } 216: true 217: else 218: false 219: end 220: } 221: 222: @active = true 223: end 224: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 226 226: def deactivate 227: if active? 228: @stream.delete_message_callback(self) 229: @stream.delete_iq_callback(self) 230: 231: @active = false 232: end 233: end
Send data directly
data: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 150 150: def send_data(databuf) 151: if active? 152: msg = Message.new 153: msg.from = @my_jid 154: msg.to = @peer_jid 155: 156: data = msg.add REXML::Element.new('data') 157: data.add_namespace NS_IBB 158: data.attributes['sid'] = @session_id 159: data.attributes['seq'] = @seq_send.to_s 160: data.text = Base64::encode64 databuf 161: 162: # TODO: Implement AMP correctly 163: amp = msg.add REXML::Element.new('amp') 164: amp.add_namespace 'http://jabber.org/protocol/amp' 165: deliver_at = amp.add REXML::Element.new('rule') 166: deliver_at.attributes['condition'] = 'deliver-at' 167: deliver_at.attributes['value'] = 'stored' 168: deliver_at.attributes['action'] = 'error' 169: match_resource = amp.add REXML::Element.new('rule') 170: match_resource.attributes['condition'] = 'match-resource' 171: match_resource.attributes['value'] = 'exact' 172: match_resource.attributes['action'] = 'error' 173: 174: @stream.send(msg) 175: 176: @seq_send += 1 177: @seq_send = 0 if @seq_send > 65535 178: else 179: raise 'Attempt to send data when not activated' 180: end 181: end