Configuring Static Shovels

The configuration for the Shovel plugin in the broker configuration file is an Erlang term (as usual) and consists of a single shovels clause:

  {rabbitmq_shovel, [ {shovels, [ {shovel_name, [ ... ]}, ... ]} ]}

A (deliberately verbose) example configuration is given below.

Each element of the list in the shovels clause is a named static shovel. The shovel_names in the list must be distinct.

Each shovel definition looks like this:

  {shovel_name, [ {sources, [ ... ]}
                , {destinations, [ ... ]}
                , {queue, queue_name}
                , {prefetch_count, count}
                , {ack_mode, a_mode}
                , {publish_properties, [ ... ]}
                , {publish_fields, [ ... ]}
                , {reconnect_delay, reconn_delay}

where shovel_name is the name of the shovel (an Erlang atom) and the clauses for sources, destinations and queue are mandatory. All the other clauses are optional.

Each clause is fully described below.

The name of the shovel (an Erlang atom). Note that Erlang atoms should be enclosed in single quotes (’) if they do not begin with a lower-case letter or if they contain other characters than alphanumeric characters, underscore (_), or @.

When using the shovel with clustering, if two or more nodes both define a shovel with the same name, the shovel worker will be started on just one of them. However if that node fails then the shovel will be recreated on a surviving node. It is important that if two or more nodes do define a shovel with the same name, then the configuration given for that shovel should be identical for all of them.

Both of these clauses are mandatory. They take the form:

  {sources, [ {brokers, broker_list}
            , {declarations, declaration_list}

(or {destinations, }). The brokers clause is mandatory and the declarations clause optional.

brokers - This clause (or its variant broker clause, see note below) is mandatory. In

  {brokers, broker_list}

broker_list is a list of URI broker connections (for the basic syntax, see AMQP URI), for example: If the host is omitted (not valid in a general AMQP URI), the shovel uses a direct connection to the broker in which it is running. This avoids using the network stack.

  [ "amqp://fred:secret@host1.domain/my_vhost"
  , "amqp://john:secret@host2.domain/my_vhost"

The syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference for the Erlang client’s extensions (including those for SSL and SASL) which are available to the shovel.

Note: If the broker list consists of a single connection URI, the variant form:

  {broker, amqp_uri_string}

is equivalent to a brokers clause with a single-element list.

This clause is optional. In

  {declarations, declaration_list}

the declaration_list is a list of AMQP methods (in the style of the Erlang client) which can be sent to the broker after connection and before shoveling.

This allows any resources that may need to be set up to be configured, including the source queue and the destination exchanges. For example:

  {declarations, [ 'queue.declare'
                 , {'queue.bind', [ {exchange, <<"my_exchange">>}
                                  , {queue,    <<>>}

will first declare an anonymous queue, and then bind it to the exchange called “my_exchange”. (The queue parameter <<>> on queue.bind means ‘use the queue last declared on this channel’.)

Each element of the list is either an atom, being the name of an AMQP method, or a tuple with first element the method atom, and second element a property-list of parameter settings.

If just the AMQP method atom is supplied all the parameters take their defaults (as illustrated with 'queue.declare’ above).

If a tuple and property-list is supplied, then the properties in the list specify some or all of the parameters explicitly.

Here is another example:

  {'exchange.declare', [ {exchange, <<"my_exchange">>}
                       , {type, <<"direct">>}
                       , durable

will declare a durable, direct exchange called “my_exchange”.

For full details, consult the Erlang Client documentation.

This clause is mandatory. In

  {queue, queue_name}

queue_name is the name of the queue (as a binary string) to shovel messages from. For example:

  {queue, <<"my_work_queue">>}

This queue must exist. Use the resource declarations to create the queue (or ensure it exists) first. If queue_name is <<>> (the empty binary string) the most recently declared queue in declarations is used. This allows anonymous queues to be declared and used.

This clause is optional. In

  {prefetch_count, count}

count is the maximum number of unacknowledged messages the shovel may hold at a time (a non-negative integer). For example:

  {prefetch_count, 1}

If this number is zero there is no limit. The default is 1000.

This clause is optional. In

  {ack_mode, a_mode}

a_mode is one of 'no_ack’, 'on_publish’ or 'on_confirm’.

'no_ack’ - indicates that no message acknowledgements are to be generated by the shovel (the broker automatically acknowledges all delivered messages);

'on_publish’ - indicates that a message acknowledgement is to be sent (to the source broker) after each message is re-published to the destination;

'on_confirm’ - indicates that publish confirmations are sought and that a message acknowledgement is to be sent (to the source broker) after each message publication is confirmed by the destination broker.

The default is 'on_confirm’, which is highly recommended. If other options are chosen performance may improve slightly, but messages are more likely to be lost in the event of failures.

This clause is optional. It takes the form:

  {publish_properties, property_list}

where the properties in the list are set on the of each message before it is re-published.

For example:

  {publish_properties, [ {delivery_mode, 2} ]}

would mark all re-published messages persistent.

By default the properties of the message are preserved, but this clause can be used to change, or set any property, including content_type, content_encoding, headers, delivery_mode, priority, correlation_id, reply_to, expiration, message_id, timestamp, type, user_id, app_id and cluster_id.

This clause is optional. It takes the form:

  {add_forward_headers, boolean}

If add_forward_headers is set to true, an x-shoveled header is added or appended to the message before it is re-published.

The default is not to add such a header.

This clause is optional. It takes the form:

  {publish_fields, property_list}

where the properties in the list are used to set the fields on the basic.publish method used to re-publish messages.

By default the messages are re-published using the original exchange name and routing key, for example. By specifying:

  {publish_fields, [ {exchange, <<"my_exchange">>}
                   , {routing_key, <<"from_shovel">>}

messages would be re-published to an explicit exchange name with an explicit, fixed routing key.

This clause is optional. In

  {reconnect_delay, reconn_delay}

reconn_delay is the number of seconds to wait before reconnecting in the event of connection failure (a non-negative number). For example:

  {reconnect_delay, 1.5}

would delay for one and a half seconds before reconnecting after failure.

If reconn_delay is 0, then no reconnections occur: the shovel will stop after the first failure.

The default reconn_delay is 5 (seconds).

Example Configuration

A verbose shovel configuration might look like this:

    [ {shovels, [ {my_first_shovel,
                    [ {sources,
                        [ {brokers, [ "amqp://fred:secret@host1.domain/my_vhost"
                                    , "amqp://john:secret@host2.domain/my_vhost"
                        , {declarations, [ {'exchange.declare',
                                              [ {exchange, <<"my_fanout">>}
                                              , {type, <<"fanout">>}
                                              , durable
                                         , {'queue.declare',
                                                 [{<<"x-message-ttl">>, long, 60000}]}]}
                                         , {'queue.bind',
                                              [ {exchange, <<"my_direct">>}
                                              , {queue,    <<>>}
                    , {destinations,
                        [ {broker, "amqp://"}
                        , {declarations, [ {'exchange.declare',
                                              [ {exchange, <<"my_direct">>}
                                              , {type, <<"direct">>}
                                              , durable
                    , {queue, <<>>}
                    , {prefetch_count, 10}
                    , {ack_mode, on_confirm}
                    , {publish_properties, [ {delivery_mode, 2} ]}
                    , {add_forward_headers, true}
                    , {publish_fields, [ {exchange, <<"my_direct">>}
                                       , {routing_key, <<"from_shovel">>}
                    , {reconnect_delay, 5}

The configuration above defines a single shovel called 'my_first_shovel’.

'my_first_shovel’ will connect to a broker on either host1 or host2 (as source), and directly to the local broker (as destination). It will reconnect to the other source broker on failure, after a delay of 5 seconds.

When connected to the source it will declare a a direct, fanout exchange called “my_fanout”, an anonymous queue with a per-queue message ttl, and bind the queue to the exchange.

When connected to the destination (the local broker) it will declare a durable, direct exchange called “my_direct”.

This shovel will re-publish messages sent to the anonymous queue on the source to the local exchange with the fixed routing key “from_shovel”. The messages will be persistent and only acknowledged after receiving a publish confirm from the local broker.

The shovel consumer will not be allowed to hold more than ten unacknowledged messages at a time.