Class AMQP::Exchange
In: lib/amqp/exchange.rb
Parent: AMQ::Client::Exchange

h2. What are AMQP exchanges?

AMQP exchange is where AMQP clients send messages. AMQP exchange may also be described as a router or a matcher. Every published message is received by an exchange which, depending on its type and message attributes, determines how to deliver the message.

Entities that forward messages to consumers (or consumers fetch messages from on demand) are called {Queue queues}. Exchanges are associated with queues via bindings. Roughly speaking, bindings determine messages placed in what exchange end up in what queues.

h2. AMQP bindings

Closely related to exchange is a concept of bindings. A binding is the relationship between an exchange and a message queue that tells the exchange how to route messages. Bindings are set up by AMQP applications (usually the app owning and using the message queue sets up bindings for it). Exchange may be bound to none, 1 or more than 1 queue.

h2. Exchange types

There are 4 supported exchange types: direct, fanout, topic and headers. Exchange type determines how exchange processes and routes messages.

h2. Direct exchanges

Direct exchanges are useful for 1:1 communication scenarios. Queues are bound to direct exchanges with a parameter called "routing key". When messages arrive to a direct exchange, broker takes that message‘s routing key (if any), finds a queue bound to the exchange with the same routing key and routes message there.

Because very often queues are bound with the same routing key as queue‘s name, AMQP 0.9.1 has a pre-declared direct exchange known as default exchange. Default exchange is a bit special: broker automatically binds all the queues (in the same virtual host) to it with routing key equal to queue names. In other words, messages delivered to default exchange are routed to queues when message routing key equals queue name. Default exchange name is an empty string.

As part of the standard, the server must predeclare the direct exchange ‘amq.direct’ and the fanout exchange ‘amq.fanout’ (all exchange names starting with ‘amq.’ are reserved). Attempts to declare an exchange using ‘amq.’ as the name will raise an AMQP::Error and fail. In practice these default exchanges are never used directly by client code.

h2. Fanout exchanges

Fanout exchanges are useful for 1:n and n:m communication where one or more producer feeds multiple consumers. messages published to a fanout exchange are delivered to queues that are bound to that exchange name (unconditionally). Each queue gets it‘s own copy of the message.

h2. Topic exchanges

Topic exchanges are used for 1:n and n:m communication scenarios. Exchange of this type uses the routing key to determine which queues to deliver the message. Wildcard matching is allowed. The topic must be declared using dot notation to separate each subtopic.

As part of the AMQP standard, each server should predeclare a topic exchange called ‘amq.topic’.

The classic example is delivering market data. When publishing market data for stocks, we may subdivide the stream based on 2 characteristics: nation code and trading symbol. The topic tree for Apple may look like stock.us.aapl. NASDAQ updates may use topic stocks.us.nasdaq, while DAX may use stock.de.dax.

When publishing data to the exchange, bound queues subscribing to the exchange indicate which data interests them by passing a routing key for matching against the published routing key.

h2. Headers exchanges

When publishing data to exchange of type headers, bound queues subscribing to the exchange indicate which data interests them by passing arguments for matching against the headers in published messages. The form of the matching can be controlled by the ‘x-match’ argument, which may be ‘any’ or ‘all’. If unspecified, it defaults to "all".

A value of ‘all’ for ‘x-match’ implies that all values must match (i.e. it does an AND of the headers ), while a value of ‘any’ implies that at least one should match (ie. it does an OR).

As part of the AMQP standard, each server should predeclare a headers exchange named ‘amq.match’.

h2. Key methods

Key methods of Exchange class are

h2. Exchange durability and persistence of messages.

Learn more in our {file:docs/Durability.textile Durability guide}.

h2. RabbitMQ extensions.

AMQP gem supports several RabbitMQ extensions taht extend Exchange functionality. Learn more in {file:docs/VendorSpecificExtensions.textile}

@note Please make sure you read a section on exchanges durability vs. messages

      persistence.

@see www.rabbitmq.com/faq.html#managing-concepts-exchanges Exchanges explained in the RabbitMQ FAQ @see www.rabbitmq.com/faq.html#Binding-and-Routing Bindings and routing explained in the RabbitMQ FAQ @see Channel#default_exchange @see Channel#direct @see Channel#fanout @see Channel#topic @see Channel#headers @see Queue @see bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.1) @see bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.5) @see bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3)

Methods

Constants

DEFAULT_CONTENT_TYPE = "application/octet-stream".freeze   API

External Aliases

default_routing_key -> key

Attributes

default_routing_key  [R]  @return [String]
name  [R]  @return [String]
on_declare  [RW]  @return [call] A callback that is executed once declaration notification (exchange.declare-ok)
                from the broker arrives.
opts  [RW]  Options hash this exchange instance was instantiated with @return [Hash]
status  [R]  @return [Symbol] @api plugin
type  [R]  Type of this exchange (one of: :direct, :fanout, :topic, :headers). @return [Symbol]

Public Class methods

The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message‘s routing key. In other words, if a message is published with a routing key of "weather.usa.ca.sandiego" and there is a queue Q with this name, that message will be routed to Q.

@param [AMQP::Channel] channel Channel to use. If not given, new AMQP channel

                               will be opened on the default AMQP connection (accessible as AMQP.connection).

@example Publishing a messages to the tasks queue

  channel     = AMQP::Channel.new(connection)
  tasks_queue = channel.queue("tasks")
  AMQP::Exchange.default(channel).publish("make clean", routing_key => "tasks")

@see Exchange @see bit.ly/hw2ELX AMQP 0.9.1 specification (Section 2.1.2.4) @note Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct

      exchange that doesn't have any special routing semantics.

@return [Exchange] An instance that corresponds to the default exchange (of type direct). @api public

See {Exchange Exchange class documentation} for introduction, information about exchange types, what uses cases they are good for and so on.

h2. Predeclared exchanges

If exchange name corresponds to one of those predeclared by AMQP 0.9.1 specification (empty string, amq.direct, amq.fanout, amq.topic, amq.match), declaration command won‘t be sent to the broker (because the only possible reply from the broker is to reject it, predefined entities cannot be changed). Callback, if any, will be executed immediately.

@example Instantiating a fanout exchange using constructor

  AMQP.connect do |connection|
    AMQP::Channel.new(connection) do |channel|
      AMQP::Exchange.new(channel, :fanout, "search.index.updates") do |exchange, declare_ok|
        # by now exchange is ready and waiting
      end
    end
  end

@example Instantiating a direct exchange using {Channel#direct}

  AMQP.connect do |connection|
    AMQP::Channel.new(connection) do |channel|
      channel.direct("email.replies_listener") do |exchange, declare_ok|
        # by now exchange is ready and waiting
      end
    end
  end

@param [Channel] channel AMQP channel this exchange is associated with @param [Symbol] type Exchange type @param [String] name Exchange name

@option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not

                                         already exist. The client can use this to check whether an exchange
                                         exists without modifying the server state.

@option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as

                                         durable. Durable exchanges and their bindings are recreated upon a server
                                         restart (information about them is persisted). Non-durable (transient) exchanges
                                         do not survive if/when a server restarts (information about them is stored exclusively
                                         in RAM).

@option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished

                                              using it. The server waits for a short period of time before
                                              determining the exchange is unused to give time to the client code
                                              to bind a queue to it.

@option opts [Boolean] :internal (false) If set, the exchange may not be used directly by publishers, but

                                              only when bound to other exchanges. Internal exchanges are used to
                                              construct wiring that is not visible to applications. *This is a RabbitMQ-specific
                                              extension.*

@option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should

                                              not wait for a reply method.  If the server could not complete the
                                              method it will raise a channel or connection exception.

@option opts [Boolean] :no_declare (true) If set, exchange declaration command won‘t be sent to the broker. Allows to forcefully

                                              avoid declaration. We recommend that only experienced developers consider this option.

@option opts [String] :default_routing_key (nil) Default routing key that will be used by {Exchange#publish} when no routing key is not passed explicitly.

                                                  It is perfectly fine for applications to always specify routing key to {Exchange#publish}.

@option opts [Hash] :arguments (nil) A hash of optional arguments with the declaration. Some brokers implement

                                         AMQP extensions using x-prefixed declaration arguments.

@raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration. @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.

@yield [exchange, declare_ok] Yields successfully declared exchange instance and AMQP method (exchange.declare-ok) instance. The latter is optional. @yieldparam [Exchange] exchange Exchange that is successfully declared and is ready to be used. @yieldparam [AMQP::Protocol::Exchange::DeclareOk] declare_ok AMQP exchange.declare-ok) instance.

@see Channel#default_exchange @see Channel#direct @see Channel#fanout @see Channel#topic @see Channel#headers @see Queue @see bit.ly/hw2ELX AMQP 0.9.1 specification (Section 3.1.3)

@return [Exchange] @api public

Protected Class methods

Public Instance methods

auto_deletable?()

Alias for auto_deleted?

@return [Boolean] true if this exchange is automatically deleted when it is no longer used @api public

Compatibility alias for on_declare.

@api public @deprecated @return [call]

@return [Channel] @api public

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are deleted, too. Further attempts to publish messages to a deleted exchange will result in a channel-level exception.

@example Deleting an exchange

 exchange = AMQP::Channel.direct("search.indexing")
 exchange.delete

@option opts [Boolean] :nowait (false) If set, the server will not respond to the method. The client should

                                       not wait for a reply method.  If the server could not complete the
                                       method it will raise a channel or connection exception.

@option opts [Boolean] :if_unused (false) If set, the server will only delete the exchange if it has no queue

                                          bindings. If the exchange has queue bindings the server does not
                                          delete it but raises a channel exception instead.

@return [NilClass] nil @api public

@return [Boolean] true if this exchange is durable @note Please make sure you read {Exchange Exchange class} documentation section on exchanges durability vs. messages

      persistence.

@api public

Publishes message to the exchange. The message will be routed to queues by the exchange and distributed to any active consumers. Routing logic is determined by exchange type and configuration as well as message attributes (like :routing_key or message headers).

Published data is opaque and not modified by Ruby amqp gem in any way. Serialization of data with JSON, Thrift, BSON or similar libraries before publishing is very common.

h2. Data serialization

Note that this method calls to_s on payload argument value. Applications are encouraged of data serialization before publishing (using JSON, Thrift, Protocol Buffers or other serialization library). Note that because AMQP is a binary protocol, text formats like JSON largely lose their strong point of being easy to inspect as data travels across network, so "BSON":bsonspec.org may be a good fit.

h2. Publishing and message persistence

In cases when you application cannot afford to lose a message, AMQP 0.9.1 has several features to offer:

  • Persistent messages
  • Messages acknowledgements
  • Transactions
  • (a RabbitMQ-specific extension) Publisher confirms

This is a broad topic and we dedicate a separate guide, {file:docs/Durability.textile Durability and message persistence}, to it.

h2. Publishing callback and guarantees it DOES NOT offer

Exact moment when message is published is not determined and depends on many factors, including machine‘s networking stack configuration, so (optional) block this method takes is scheduled for next event loop tick, and data is staged for delivery for current event loop tick. For most applications, this is good enough. The only way to guarantee a message was delivered in a distributed system is to ask a peer to send you a message back. RabbitMQ

@note Optional callback this method takes DOES NOT OFFER ANY GUARANTEES ABOUT DATA DELIVERY and must not be used as a "delivery callback".

      The only way to guarantee delivery in distributed environment is to use an acknowledgement mechanism, such as AMQP transactions
      or lightweight "publisher confirms" RabbitMQ extension supported by amqp gem. See {file:docs/Durability.textile Durability and message persistence}
      and {file:docs/Exchanges.textile Working With Exchanges} guides for details.

h2. Event loop blocking

When intermixing publishing of many messages with other workload that may take some time, even loop blocking may affect the performance. There are several ways to avoid it:

  • Run EventMachine in a separate thread.
  • Use EventMachine.next_tick.
  • Use EventMachine.defer to offload operation to EventMachine thread pool.

TBD: this subject is worth a separate guide

h2. Sending one-off messages

If you need to send a one-off message and then stop the event loop, pass a block to {Exchange#publish} that will be executed after message is pushed down the network stack, and use {AMQP::Session#disconnect} to properly tear down AMQP connection (see example under Examples section below).

@example Publishing a one-off message and properly closing AMQP connection then stopping the event loop:

  exchange.publish(data) do
    connection.disconnect { EventMachine.stop }
  end

@param [to_s] payload Message payload (content). Note that this method calls to_s on payload argument value.

                         You are encouraged to take care of data serialization before publishing (using JSON, Thrift,
                         Protocol Buffers or other serialization library).

@option options [String] :routing_key (nil) Specifies message routing key. Routing key determines

                                     what queues messages are delivered to (exact routing algorithms vary
                                     between exchange types).

@option options [Boolean] :mandatory (false) This flag tells the server how to react if the message cannot be

                                     routed to a queue. If message is mandatory, the server will return
                                     unroutable message back to the client with basic.return AMQPmethod.
                                     If message is not mandatory, the server silently drops the message.

@option options [Boolean] :immediate (false) This flag tells the server how to react if the message cannot be

                                     routed to a queue consumer immediately.  If this flag is set, the
                                     server will return an undeliverable message with a Return method.
                                     If this flag is zero, the server will queue the message, but with
                                     no guarantee that it will ever be consumed.

@option options [Boolean] :persistent (false) When true, this message will be persisted to disk and remain in the queue until

                                      it is consumed. When false, the message is only kept in a transient store
                                      and will lost in case of server restart.
                                      When performance and latency are more important than durability, set :persistent => false.
                                      If durability is more important, set :persistent => true.

@option options [String] :content_type (application/octet-stream) Content-type of message payload.

@example Publishing without routing key

 exchange = channel.fanout('search.indexer')
 # fanout exchanges deliver messages to bound queues unconditionally,
 # so routing key is unnecessary here
 exchange.publish("some data")

@example Publishing with a routing key

 exchange = channel.direct('search.indexer')
 exchange.publish("some data", :routing_key => "search.index.updates")

@return [Exchange] self

@note Please make sure you read {file:docs/Durability.textile Durability an message persistence} guide that covers exchanges durability vs. messages

      persistence.

@api public

Resets queue state. Useful for error handling. @api plugin

temporary?()

Alias for transient?

@return [Boolean] true if this exchange is transient (non-durable) @note Please make sure you read {Exchange Exchange class} documentation section on exchanges durability vs. messages

      persistence.

@api public

[Validate]