def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?
@connection = connection || AMQP.connection || AMQP.start
if id.kind_of?(Hash)
options = options.merge(id)
id = self.class.next_channel_id
end
super(@connection, id, options)
@rpcs = Hash.new
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
@parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}
@connection.on_connection do
self.open do |ch, open_ok|
@channel_is_open_deferrable.succeed
if block
case block.arity
when 1 then block.call(ch)
else block.call(ch, open_ok)
end
end
self.prefetch(options[:prefetch], false) if options[:prefetch]
end
end
end