LATEST VERSION: 3.6.5 - CHANGELOG
Pivotal RabbitMQ v3.x

.NET/C# Client API Guide

This page gives an overview of the RabbitMQ .NET/C# client API.

The code samples given here demonstrate connecting to RabbitMQ and performing several common operations with the client.

The library is open-source, and is dual-licensed under the Apache License v2 and the Mozilla Public License v1.1.

The client is dual-licensed under

The client API is closely modelled on the AMQP 0-9-1 protocol specification, with additional abstractions for ease of use.

This section gives an overview of the RabbitMQ .NET client API. Only the basics of using the library are covered: for full detail, please see the javadoc-like API documentation generated from the source code.

Major namespaces, interfaces and classes

The core API interfaces and classes are defined in the RabbitMQ.Client namespace:

        using RabbitMQ.Client;

The core API interfaces and classes are

  • IModel: represents an AMQP 0-9-1 channel, and provides most of the operations (protocol methods).
  • IConnection: represents an AMQP 0-9-1 connection
  • ConnectionFactory: constructs IConnection instances
  • IBasicConsumer: represents a message consumer

Other useful interfaces and classes include:

  • DefaultBasicConsumer: commonly used base class for consumers

Public namespaces other than RabbitMQ.Client include:

  • RabbitMQ.Client.Events: various events and event handlers that are part of the client library, including EventingBasicConsumer, a consumer implementation built around C# event handlers.
  • RabbitMQ.Client.Exceptions: exceptions visible to the user.

All other namespaces are reserved for private implementation detail of the library, although members of private namespaces are usually made available to applications using the library in order to permit developers to implement workarounds for faults or design mistakes they discover in the library implementation. Applications cannot rely on any classes, interfaces, member variables etc. that appear within private namespaces remaining stable across releases of the library.

Connecting to a Broker

To connect to a RabbitMQ, it is necessary to instantiate a ConnectionFactory and configure it to use desired hostname, virtual host, and credentials. Then use ConnectionFactory.CreateConnection() to open a connection. The following two code snippets connect to a RabbitMQ node on hostName:

        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = user;
        // "gue
        factory.Password = pass;
        factory.VirtualHost = vhost;
        factory.HostName = hostName;

        IConnection conn = factory.CreateConnection();

        ConnectionFactory factory = new ConnectionFactory();
        factory.Uri = "amqp://user:pass@hostName:port/vhost";

        IConnection conn = factory.CreateConnection();

Since the .NET client uses a stricter interpretation of the AMQP 0-9-1 URI spec than the other clients, care must be taken when using URIs. In particular, the host part must not be omitted and virtual hosts with empty names are not addressable. All factory properties have default values. The default value for a property will be used if the property remains unassigned prior to creating a connection:

Username
"guest"

Password
"guest"

Virtual host
"/"

Hostname
"localhost"

port
5672 for regular connections, 5671 for connections that use TLS

The IConnection interface can then be used to open a channel:

        IModel channel = conn.CreateModel();

The channel can now be used to send and receive messages, as described in subsequent sections.

Using Exchanges and Queues

Client applications work with exchanges and queues, the high-level building blocks of AMQP 0-9-1. These must be “declared” before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary. Continuing the previous example, the following code declares an exchange and a queue, then binds them together.

        model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
        model.QueueDeclare(queueName, false, false, false, null);
        model.QueueBind(queueName, exchangeName, routingKey, null);

This will actively declare the following objects:

  • a non-durable, non-autodelete exchange of “direct” type
  • a non-durable, non-autodelete, non-exclusive queue

The exchange can be customised by using additional parameters. The above code then binds the queue to the exchange with the given routing key. Note that many channel API (IModel) methods are overloaded. The convenient short form of ExchangeDeclare uses sensible defaults. There are also longer forms with more parameters, to let you override these defaults as necessary, giving full control where needed. This “short version, long version” pattern is used throughout the API.

Publishing Messages

To publish a message to an exchange, use IModel.BasicPublish as follows:

        byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
        model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);

For fine control, you can use overloaded variants to specify the mandatory flag, or specify messages properties:

        byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
        IBasicProperties props = model.CreateBasicProperties();
        props.ContentType = "text/plain";
        props.DeliveryMode = 2;
        mode.BasicPublish(exchangeName,
                          routingKey, props,
                          messageBodyBytes);

This sends a message with delivery mode 2 (persistent) and content-type “text/plain”. See the definition of the IBasicProperties interface for more information about the available header properties.

Fetching Individual Messages (“pull API”)

To retrieve individual messages, use IModel.BasicGet. The returned value is an instance of BasicGetResult, from which the header information (properties) and message body can be extracted:

        bool noAck = false;
        BasicGetResult result = channel.BasicGet(queueName, noAck);
        if (result == null) {
            // No message available at this time.
        } else {
            IBasicProperties props = result.BasicProperties;
            byte[] body = result.Body;
            ...

Since noAck = false above, you must also call IModel.BasicAck to acknowledge that you have successfully received and processed the message:

            ...
            // acknowledge receipt of the message
            channel.BasicAck(result.DeliveryTag, false);
        }

Note that fetching messages using this API is relatively inefficient. If you’d prefer RabbitMQ to push messages to the client, see the next section.

Retrieving Messages By Subscription (“push API”)

Another way to receive messages is to set up a subscription using the IBasicConsumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively. One way to implement a consumer is to use the convenience class EventingBasicConsumer, which dispatches deliveries and other consumer lifecycle events as C# events:

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (ch, ea) =>
                        {
                            var body = ea.Body;
                            // ... process the message
                            ch.BasicAck(ea.DeliveryTag, false);
                        };  
        String consumerTag = channel.BasicConsume(queueName, false, consumer);

Another option is to subclass DefaultBasicConsumer, overriding methods as necessary, or implement IBasicConsumer directly. You will generally want to implement the core method IBasicConsumer.HandleBasicDeliver. More sophisticated consumers will need to implement further methods. In particular, HandleModelShutdown traps channel/connection closure. Consumers can also implement HandleBasicCancelOk to be notified of cancellations. The ConsumerTag property of DefaultBasicConsumer can be used to retrieve the server-generated consumer tag, in cases where none was supplied to the original IModel.BasicConsume call. You can cancel an active consumer with IModel.BasicCancel:

        channel.BasicCancel(consumerTag);

When calling the API methods, you always refer to consumers by their consumer tags, which can be either client- or server-generated as explained in the AMQP 0-9-1 specification document.

Concurrency Considerations for Consumers

Each IConnection instance is, in the current implementation, backed by a single background thread that reads from the socket and dispatches the resulting events to the application. If heartbeats are enabled, as of version 3.5.0 they are implemented in terms of .NET timers. Usually, therefore, there will be at least two threads active in an application using this library:

the application thread
contains the application logic, and makes calls on IModel methods to perform protocol operations.

the I/O activity thread
hidden away and completely managed by the IConnection instance.

The one place where the nature of the threading model is visible to the application is in any callback the application registers with the library. Such callbacks include:

  • any IBasicConsumer method
  • the BasicReturn event on IModel
  • any of the various shutdown events on IConnection, IModel etc.

Consumer Callbacks and Ordering

As of version 3.5.0 application callback handlers can invoke blocking operations (such as IModel.QueueDeclare or IModel.BasicCancel). IBasicConsumer callbacks are invoked concurrently. However, per-channel operation order is preserved. In other word, if messages A and B were delivered in this order on the same channel, they will be processed in this order. If messages A and B were delivered on different channels, they can be processed in any order (or in parallel). Consumer callbacks are invoked in tasks dispatched to the default TaskScheduler provided by the .NET runtime.

Using a Custom Task Scheduler

It is possible to use a custom task scheduler by setting ConnectionFactory.TaskScheduler:

          public class CustomTaskScheduler : TaskScheduler
          {
            // ...
          }

          var cf = new ConnectionFactory();
          cf.TaskScheduler = new CustomTaskScheduler();

This, for example, can be used to limit concurrency degree with a custom TaskScheduler.

Sharing Channels Between Threads

As a rule of thumb, IModel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for IModel instances. If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion itself. One way of achieving this is for all users of an IModel to lock the instance itself:

            IModel ch = RetrieveSomeSharedIModelInstance();
            lock (ch) {
              ch.BasicPublish(...);
            }

Symptoms of incorrect serialisation of IModel operations include, but are not limited to,

  • invalid frame sequences being sent on the wire (which occurs, for example, if more than one BasicPublish operation is run simultaneously), and/or
  • NotSupportedExceptions being thrown from a method in class RpcContinuationQueue complaining about "Pipelining of requests forbidden" (which occurs in situations where more than one AMQP 0-9-1 synchronous operation, such as ExchangeDeclare, is run simultaneously).

Handling Unroutable Messages

If a message is published with the “mandatory” flag set, but cannot be delivered, the broker will return it to the sending client (via a basic.return AMQP 0-9-1 command). To be notified of such returns, clients can subscribe to the IModel.BasicReturn event. If there are no listeners attached to the event, then returned messages will be silently dropped.

          model.BasicReturn +=
          new RabbitMQ.Client.Events.BasicReturnEventHandler(...);

The BasicReturn event will fire, for example, if the client publishes a message with the “mandatory” flag set to an exchange of “direct” type which is not bound to a queue.

Disconnecting from RabbitMQ

To disconnect, simply close the channel and the connection:

        channel.Close(200, "Goodbye");
        conn.Close();

Note that closing the channel is considered good practice, but isn’t strictly necessary - it will be done automatically anyway when the underlying connection is closed. In some situations, you may want the connection to close automatically once the last open channel on the connection closes. To achieve this, set the IConnection.AutoClose property to true, but only after creating the first channel:

        IConnection conn = factory.CreateConnection(...);
        IModel channel = conn.CreateModel();
        conn.AutoClose = true;

When AutoClose is true, the last channel to close will also cause the connection to close. If it is set to true before any channel is created, the connection will close then and there.

Automatic Recovery From Network Failures

Common ways of working with AMQP 0-9-1

Asynchronous, one-way messaging

A RabbitMQ Binding for WCF