Client APIΒΆ

The Client class provides all of the methods required for interfacing with RabbitMQ.

class aiorabbit.client.Client(url='amqp://guest:guest@localhost', locale='en-US', product='aiorabbit/1.0.1', loop=None, on_return=None, ssl_context=None)[source]ΒΆ

AsyncIO RabbitMQ Client

This client provides a streamlined interface for interacting with RabbitMQ.

Instead of manually managing your channels, the client will do so for you. In addition, if you are disconnected remotely due to an error, it will attempt to automatically reconnect. Any non-connection related exception should leave you in a state where you can continue working with RabbitMQ, even if it disconnected the client as part of the exception.

Note

AMQ Methods vs Opinionated Methods

For the most part, the client directly implements the AMQ model combining class and method RPC calls as a function. For example, Basic.Ack is implemented as Client.basic_ack(). However, some methods, such as Client.consume(), Client.publish(), and Client.qos_prefetch() provide a higher-level and more opinionated implementation than their respected AMQ RPC methods.

Parameters
  • url (str) – The URL to connect to RabbitMQ with

  • locale (str) – The locale to specify for the RabbitMQ connection

  • product (str) – The project name to specify for the RabbitMQ connection

  • loop (AbstractEventLoop) – An optional IO Loop to specify, if unspecified, asyncio.get_running_loop() will be used to determine the IO Loop.

  • on_return (Callable) – An optional callback method to be invoked if the server returns a published method. Can also be set using the register_basic_return_callback() method.

Example UsageΒΆ
 client = Client(RABBITMQ_URL)
 await client.connect()
 await client.exchange_declare('test', 'topic')
 await client.close()
Parameters

ssl_context (typing.Optional[ssl.SSLContext]) –

async connect()[source]ΒΆ

Connect to the RabbitMQ Server

See also

aiorabbit.connect() for connecting as a context-manager that automatically closes when complete.

Example UsageΒΆ
 client = Client(RABBITMQ_URL)
 await client.connect()
Raises
Return type

None

async close()[source]ΒΆ

Close the client connection to the server

Return type

None

property is_connectedΒΆ

Indicates if the connection is available

Return type

bool

property is_closedΒΆ

Indicates if the connection is closed or closing

Return type

bool

property server_capabilitiesΒΆ

Contains the capabilities of the currently connected RabbitMQ Server.

Example return valueΒΆ
['authentication_failure_close',
 'basic.nack',
 'connection.blocked',
 'consumer_cancel_notify',
 'consumer_priorities',
 'direct_reply_to',
 'exchange_exchange_bindings',
 'per_consumer_qos',
 'publisher_confirms']
Return type

typing.List[str]

property server_propertiesΒΆ

Contains the negotiated properties for the currently connected RabbitMQ Server.

Return type

FieldTable

Example return valueΒΆ
{'capabilities': {'authentication_failure_close': True,
                  'basic.nack': True,
                  'connection.blocked': True,
                  'consumer_cancel_notify': True,
                  'consumer_priorities': True,
                  'direct_reply_to': True,
                  'exchange_exchange_bindings': True,
                  'per_consumer_qos': True,
                  'publisher_confirms': True},
 'cluster_name': 'rabbit@b6a4a6555767',
 'copyright': 'Copyright (c) 2007-2019 Pivotal Software, Inc.',
 'information': 'Licensed under the MPL 1.1. '
                'Website: https://rabbitmq.com',
 'platform': 'Erlang/OTP 22.2.8',
 'product': 'RabbitMQ',
 'version': '3.8.2'}
consume(queue='', no_local=False, no_ack=False, exclusive=False, arguments=None)[source]ΒΆ

Generator function that consumes from a queue, yielding a Message and automatically cancels when the generator is closed.

See also

PEP 525 for information on Async Generators and Client.basic_consume() for callback style consuming.

Parameters
  • queue (str) – Specifies the name of the queue to consume from

  • no_local (bool) – Do not deliver own messages

  • no_ack (bool) – No acknowledgement needed

  • exclusive (bool) – Request exclusive access

  • arguments (Arguments) – A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.

Return type

typing.AsyncGenerator[aiorabbit.message.Message, None]

Yields

aiorabbit.message.Message

Example UsageΒΆ
 consumer = self.client.consume(self.queue)
 async for msg in consumer:
     await self.client.basic_ack(msg.delivery_tag)
     if msg.body == b'stop':
         break
async publish(exchange='amq.direct', routing_key='', message_body=b'', mandatory=False, app_id=None, content_encoding=None, content_type=None, correlation_id=None, delivery_mode=None, expiration=None, headers=None, message_id=None, message_type=None, priority=None, reply_to=None, timestamp=None, user_id=None)[source]ΒΆ

Publish a message to RabbitMQ

message_body can either be str or bytes. If it is a str, it will be encoded to a bytes instance using UTF-8 encoding.

If publisher confirms are enabled, will return True or False indicating success or failure.

See also

Client.confirm_select() for enabling publisher confirmation of published messages.

Note

The immediate flag is not offered as it is not implemented in RabbitMQ as of this time. See basic / publish in the β€œMethods from the AMQP specification, version 0-9-1” table in RabbitMQ’s Compatibility and Conformance page for more information.

Parameters
Raises
  • TypeError – if an argument is of the wrong data type

  • ValueError – if the value of one an argument does not validate

  • aiorabbit.exceptions.NotFound – When publisher confirms are enabled and mandatory is set and the exchange that is being published to does not exist.

Return type

typing.Optional[bool]

async qos_prefetch(count=0, per_consumer=True)[source]ΒΆ

Specify the number of messages to pre-allocate for a consumer.

This method requests a specific quality of service. It uses Basic.QoS under the covers, but due to the redefinition of the global argument in RabbitMQ, along with the lack of prefetch_size, it is redefined here as qos_prefetch and is used for the count only.

The QoS can be specified for the current channel or individual consumers on the channel.

Parameters
  • count – Window in messages to pre-allocate for consumers

  • per_consumer – Apply QoS to new consumers when True or to the whole channel when False.

Return type

None

register_basic_return_callback(value)[source]ΒΆ

Register a callback that is invoked when RabbitMQ returns a published message. The callback can be a synchronous or asynchronous method and is invoked with the returned message as an instance of Message.

Parameters

value (Callable) – The method or function to invoke as a callback

Example UsageΒΆ
async def on_return(msg: aiorabbit.message.Message) -> None:
    self._logger.warning('RabbitMQ Returned a message: %r', msg)

 client = Client(RABBITMQ_URL)
 client.register_basic_return_callback(on_return)
 await client.connect()

 # ... publish messages that could return
Return type

None

async basic_qos()[source]ΒΆ

This method is not implemented, as RabbitMQ does not fully implement it and changes the of the semantic meaning of how it is used.

Use the Client.qos_prefetch() method instead as it implements the Basic.QoS behavior as it currently works in RabbitMQ.

See also

See the RabbitMQ site for more information on RabbitMQ’s implementation and changes to Basic.QoS.

Raises

NotImplementedError – when invoked

Return type

None

async basic_consume(queue='', no_local=False, no_ack=False, exclusive=False, arguments=None, callback=None, consumer_tag=None)[source]ΒΆ

Start a queue consumer

This method asks the server to start a β€œconsumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them.

This method is used for callback passing style usage. For each message, the callback method will be invoked, passing in an instance of Message.

The Client.consume method should be used for generator style consuming.

Parameters
  • queue (str) – Specifies the name of the queue to consume from

  • no_local (bool) – Do not deliver own messages

  • no_ack (bool) – No acknowledgement needed

  • exclusive (bool) – Request exclusive access

  • arguments (Arguments) – A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.

  • callback (Callable) – The method to invoke for each received message.

  • consumer_tag (typing.Optional[str]) – Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.

Return type

str

Returns

the consumer tag value

async basic_cancel(consumer_tag='')[source]ΒΆ

End a queue consumer

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the CancelOk reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion. Note that as it is not a MUST for clients to accept this method from the server, it is advisable for the broker to be able to identify those clients that are capable of accepting the method, through some means of capability negotiation.

Parameters

consumer_tag (str) – Consumer tag

Return type

None

async basic_get(queue='', no_ack=False)[source]ΒΆ

Direct access to a queue

This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.

Parameters
  • queue (str) – Specifies the name of the queue to get a message from

  • no_ack (bool) – No acknowledgement needed

Return type

typing.Optional[aiorabbit.message.Message]

async basic_ack(delivery_tag, multiple=False)[source]ΒΆ

Acknowledge one or more messages

When sent by the client, this method acknowledges one or more messages delivered via the Basic.Deliver or Basic.GetOk methods. The acknowledgement can be for a single message or a set of messages up to and including a specific message.

Parameters
  • delivery_tag (int) – Server-assigned delivery tag

  • multiple (bool) – Acknowledge multiple messages

Return type

None

async basic_nack(delivery_tag, multiple=False, requeue=True)[source]ΒΆ

Reject one or more incoming messages

This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters
  • delivery_tag (int) – Server-assigned delivery tag

  • multiple (bool) – Reject multiple messages

  • requeue (bool) – Requeue the message

Return type

None

async basic_reject(delivery_tag, requeue=True)[source]ΒΆ

Reject an incoming message

This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters
  • delivery_tag (int) – Server-assigned delivery tag

  • requeue (bool) – Requeue the message

Return type

None

async basic_publish()[source]ΒΆ

This method is not implemented and the more opinionated publish() method exists, implementing the Basic.Publish RPC.

Raises

NotImplementedError – when invoked

Return type

None

async basic_recover(requeue=False)[source]ΒΆ

Redeliver unacknowledged messages

This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered.

Parameters

requeue (bool) – Requeue the message

Raises

aiorabbit.exceptions.NotImplemented – when False is specified for requeue

Return type

None

async confirm_select()[source]ΒΆ

Enable Publisher Confirms

Warning

RabbitMQ will only indicate a publishing failure via publisher confirms when there is an internal error in RabbitMQ. They are not a mechanism for guaranteeing a message is routed. Usage of the mandatory flag when publishing will only guarantee that the message is routed into an exchange, but not that it is published into a queue.

Raises
Return type

None

async exchange_declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None)[source]ΒΆ

Verify exchange exists, create if needed

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

Parameters
  • exchange (str) – Exchange name

  • exchange_type (str) – Exchange type

  • passive (bool) – Do not create exchange

  • durable (bool) – Request a durable exchange

  • auto_delete (bool) – Auto-delete when unused

  • internal (bool) – Create internal exchange

  • arguments (Arguments) – Arguments for declaration

Raises
Return type

None

async exchange_delete(exchange='', if_unused=False)[source]ΒΆ

Delete an exchange

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.

Parameters
  • exchange (str) – exchange name - Default: ''

  • if_unused (bool) – Delete only if unused - Default: False

Raises

ValueError – when an argument fails to validate

Return type

None

async exchange_bind(destination='', source='', routing_key='', arguments=None)[source]ΒΆ

Bind exchange to an exchange.

Parameters
  • destination (str) – Destination exchange name

  • source (str) – Source exchange name

  • routing_key (str) – Message routing key

  • arguments (Arguments) – Arguments for binding

Raises
Return type

None

async exchange_unbind(destination='', source='', routing_key='', arguments=None)[source]ΒΆ

Unbind an exchange from an exchange.

Parameters
  • destination (str) – Destination exchange name

  • source (str) – Source exchange name

  • routing_key (str) – Message routing key

  • arguments (Arguments) – Arguments for binding

Raises
  • TypeError – if an argument is of the wrong data type

  • ValueError – if an argument value does not validate

Return type

None

async queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]ΒΆ

Declare queue, create if needed

This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Returns a tuple of message count, consumer count.

Parameters
  • queue (str) – Queue name

  • passive (bool) – Do not create queue

  • durable (bool) – Request a durable queue

  • exclusive (bool) – Request an exclusive queue

  • auto_delete (bool) – Auto-delete queue when unused

  • arguments (Arguments) – Arguments for declaration

Raises
Return type

typing.Tuple[int, int]

async queue_delete(queue='', if_unused=False, if_empty=False)[source]ΒΆ

Delete a queue

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

Parameters
  • queue (str) – Specifies the name of the queue to delete

  • if_unused (bool) – Delete only if unused

  • if_empty (bool) – Delete only if empty

Return type

None

async queue_bind(queue='', exchange='', routing_key='', arguments=None)[source]ΒΆ

Bind queue to an exchange

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.

Parameters
  • queue (str) – Specifies the name of the queue to bind

  • exchange (str) – Name of the exchange to bind to

  • routing_key (str) – Message routing key

  • arguments (Arguments) – Arguments of binding

Raises
  • TypeError – if an argument is of the wrong data type

  • ValueError – when an argument fails to validate

Return type

None

async queue_unbind(queue='', exchange='', routing_key='', arguments=None)[source]ΒΆ

Unbind a queue from an exchange

This method unbinds a queue from an exchange.

Parameters
  • queue (str) – Specifies the name of the queue to unbind

  • exchange (str) – Name of the exchange to unbind from

  • routing_key (str) – Message routing key

  • arguments (Arguments) – Arguments of binding

Raises
  • TypeError – if an argument is of the wrong data type

  • ValueError – when an argument fails to validate

Return type

None

async queue_purge(queue='')[source]ΒΆ

Purge a queue

This method removes all messages from a queue which are not awaiting acknowledgment.

Parameters

queue (str) – Specifies the name of the queue to purge

Return type

int

Returns

The quantity of messages purged

async tx_select()[source]ΒΆ

Select standard transaction mode

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the tx_commit() or tx_rollback() methods.

Return type

None

async tx_commit()[source]ΒΆ

Commit the current transaction

This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit.

Raises

aiorabbit.exceptions.NoTransactionError – when invoked prior to invoking tx_select().

Return type

None

async tx_rollback()[source]ΒΆ

Abandon the current transaction

This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued.

Raises

aiorabbit.exceptions.NoTransactionError – when invoked prior to invoking tx_select().

Return type

None