Pivotal RabbitMQ v3.x

Consumer Priorities

Consumer priorities allow you to ensure that high priority consumers receive messages while they are active, with messages only going to lower priority consumers when the high priority consumers block.

Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.

Definition of active consumers

The above paragraphs refer to consumers as being active or blocked. At any moment, any given consumer is either one or the other. An active consumer is one which could receive a message without waiting. A consumer becomes blocked if it cannot receive messages - because its channel has reached the maximum number of unacknowledged messages after issuing basic.qos, or simply because of network congestion.

Therefore for each queue, at least one of three things must be true:

  • There are no active consumers
  • The queue is empty
  • The queue is busy delivering messages to consumers

Note that consumers can switch between active and blocked many times per second. We therefore don’t expose whether a consumer is active or blocked through the management plugin or rabbitmqctl.

When consumer priorities are in use, you can expect your highest priority consumers to receive all the messages until they become blocked, at which point lower priority consumers will start to receive some. It’s important to understand that RabbitMQ will still prioritise delivering messages - it will not wait for a high priority blocked consumer to become unblocked if there is an active lower priority consumer ready.

Using consumer priorities

Set the x-priority argument in the basic.consume method to an integer value. Consumers which do not specify a value have priority 0. Larger numbers indicate higher priority, and both positive and negative numbers can be used.

For example (in Java):

Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume("my-queue", false, args, consumer);

This creates a new consumer with priority 10.