Parent

AMQP::Consumer

AMQP consumers are entities that handle messages delivered to them ("push API" as opposed to "pull API") by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).

@see AMQP::Queue @see AMQP::Queue#subscribe @see AMQP::Queue#cancel

Attributes

arguments[R]

@return [Hash] Custom subscription metadata

channel[R]

@return [AMQP::Channel] Channel this consumer uses

consumer_tag[R]

@return [String] Consumer tag, unique consumer identifier

queue[R]

@return [AMQP::Queue] Queue messages are consumed from

Public Class Methods

new(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false) click to toggle source
# File lib/amqp/consumer.rb, line 41
def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false)
  super(channel, queue, (consumer_tag || self.class.tag_generator.generate_for(queue)), exclusive, no_ack, arguments, no_local)
end
tag_generator() click to toggle source

@return [AMQ::Client::ConsumerTagGenerator] Consumer tag generator

# File lib/amqp/consumer.rb, line 30
def self.tag_generator
  @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new
end
tag_generator=(generator) click to toggle source

@param [AMQ::Client::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances @return [AMQ::Client::ConsumerTagGenerator] Provided argument

# File lib/amqp/consumer.rb, line 36
def self.tag_generator=(generator)
  @tag_generator = generator
end

Public Instance Methods

acknowledge(delivery_tag) click to toggle source

Acknowledge a delivery tag. @return [Consumer] self

@api public @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)

# File lib/amqp/consumer.rb, line 142
def acknowledge(delivery_tag)
  super(delivery_tag)
end
after_connection_interruption(&block) click to toggle source
after_recovery(&block) click to toggle source
Alias for: on_recovery
auto_recover() click to toggle source

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).

@api plugin

# File lib/amqp/consumer.rb, line 193
def auto_recover
  super
end
before_recovery(&block) click to toggle source

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/consumer.rb, line 175
def before_recovery(&block)
  super(&block)
end
callback() click to toggle source

Legacy {AMQP::Queue} API compatibility. @private @deprecated

# File lib/amqp/consumer.rb, line 103
def callback
  if @callbacks[:delivery]
    @callbacks[:delivery].first
  end
end
cancel(nowait = false, &block) click to toggle source

@return [AMQP::Consumer] self

# File lib/amqp/consumer.rb, line 82
def cancel(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      super(nowait, &block)
    end
  end

  self
end
consume(nowait = false, &block) click to toggle source

Begin consuming messages from the queue @return [AMQP::Consumer] self

# File lib/amqp/consumer.rb, line 54
def consume(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      super(nowait, &block)
    end
  end

  self
end
exclusive?() click to toggle source

@return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)

# File lib/amqp/consumer.rb, line 46
def exclusive?
  super
end
inspect() click to toggle source

@return [String] Readable representation of relevant object state.

# File lib/amqp/consumer.rb, line 201
def inspect
  "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
end
on_connection_interruption(&block) click to toggle source

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/consumer.rb, line 164
def on_connection_interruption(&block)
  super(&block)
end
on_delivery(&block) click to toggle source

Register a block that will be used to handle delivered messages.

@return [AMQP::Consumer] self @see AMQP::Queue#subscribe

# File lib/amqp/consumer.rb, line 114
def on_delivery(&block)
  # We have to maintain this multiple arities jazz
  # because older versions this gem are used in examples in at least 3
  # books published by O'Reilly :(. MK.
  delivery_shim = Proc.new { |basic_deliver, headers, payload|
    case block.arity
    when 1 then
      block.call(payload)
    when 2 then
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload)
    else
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
    end
  }

  super(&delivery_shim)
end
on_recovery(&block) click to toggle source

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

@api public

# File lib/amqp/consumer.rb, line 183
def on_recovery(&block)
  super(&block)
end
Also aliased as: after_recovery
reject(delivery_tag, requeue = true) click to toggle source

@return [Consumer] self

@api public @see files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)

# File lib/amqp/consumer.rb, line 151
def reject(delivery_tag, requeue = true)
  super(delivery_tag, requeue)
end
resubscribe(&block) click to toggle source

Used by automatic recovery code. @api plugin @return [AMQP::Consumer] self

# File lib/amqp/consumer.rb, line 67
def resubscribe(&block)
  @channel.once_open do
    @queue.once_declared do
      self.unregister_with_channel
      @consumer_tag = self.class.tag_generator.generate_for(@queue)
      self.register_with_channel

      super(&block)
    end
  end

  self
end
subscribed?() click to toggle source

{AMQP::Queue} API compatibility.

@return [Boolean] true if this consumer is active (subscribed for message delivery) @api public

# File lib/amqp/consumer.rb, line 96
def subscribed?
  !@callbacks[:delivery].empty?
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.