Client APIΒΆ
The Client
class provides all of the methods required for interfacing with RabbitMQ.
- class
aiorabbit.client.
Client
(url='amqp://guest:guest@localhost', locale='en-US', product='aiorabbit/1.0.1', loop=None, on_return=None, ssl_context=None)[source]ΒΆ AsyncIO RabbitMQ Client
This client provides a streamlined interface for interacting with RabbitMQ.
Instead of manually managing your channels, the client will do so for you. In addition, if you are disconnected remotely due to an error, it will attempt to automatically reconnect. Any non-connection related exception should leave you in a state where you can continue working with RabbitMQ, even if it disconnected the client as part of the exception.
Note
AMQ Methods vs Opinionated Methods
For the most part, the client directly implements the AMQ model combining class and method RPC calls as a function. For example,
Basic.Ack
is implemented asClient.basic_ack()
. However, some methods, such asClient.consume()
,Client.publish()
, andClient.qos_prefetch()
provide a higher-level and more opinionated implementation than their respected AMQ RPC methods.- Parameters
url (
str
) β The URL to connect to RabbitMQ withlocale (
str
) β The locale to specify for the RabbitMQ connectionproduct (
str
) β The project name to specify for the RabbitMQ connectionloop (
AbstractEventLoop
) β An optional IO Loop to specify, if unspecified,asyncio.get_running_loop()
will be used to determine the IO Loop.on_return (
Callable
) β An optional callback method to be invoked if the server returns a published method. Can also be set using theregister_basic_return_callback()
method.
Example UsageΒΆclient = Client(RABBITMQ_URL) await client.connect() await client.exchange_declare('test', 'topic') await client.close()
- Parameters
ssl_context (
typing.Optional
[ssl.SSLContext
]) β
- async
connect
()[source]ΒΆ Connect to the RabbitMQ Server
See also
aiorabbit.connect()
for connecting as a context-manager that automatically closes when complete.Example UsageΒΆclient = Client(RABBITMQ_URL) await client.connect()
- Raises
asyncio.TimeoutError β on connection timeout
OSError β when a networking error occurs
aiorabbit.exceptions.AccessRefused β when authentication or authorization fails
aiorabbit.exceptions.ClientNegotiationException β when the client fails to negotiate with the server
- Return type
None
- property
server_capabilities
ΒΆ Contains the capabilities of the currently connected RabbitMQ Server.
Example return valueΒΆ['authentication_failure_close', 'basic.nack', 'connection.blocked', 'consumer_cancel_notify', 'consumer_priorities', 'direct_reply_to', 'exchange_exchange_bindings', 'per_consumer_qos', 'publisher_confirms']
- Return type
- property
server_properties
ΒΆ Contains the negotiated properties for the currently connected RabbitMQ Server.
- Return type
FieldTable
Example return valueΒΆ{'capabilities': {'authentication_failure_close': True, 'basic.nack': True, 'connection.blocked': True, 'consumer_cancel_notify': True, 'consumer_priorities': True, 'direct_reply_to': True, 'exchange_exchange_bindings': True, 'per_consumer_qos': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@b6a4a6555767', 'copyright': 'Copyright (c) 2007-2019 Pivotal Software, Inc.', 'information': 'Licensed under the MPL 1.1. ' 'Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 22.2.8', 'product': 'RabbitMQ', 'version': '3.8.2'}
-
consume
(queue='', no_local=False, no_ack=False, exclusive=False, arguments=None)[source]ΒΆ Generator function that consumes from a queue, yielding a
Message
and automatically cancels when the generator is closed.See also
PEP 525 for information on Async Generators and
Client.basic_consume()
for callback style consuming.- Parameters
queue (
str
) β Specifies the name of the queue to consume fromno_local (
bool
) β Do not deliver own messagesno_ack (
bool
) β No acknowledgement neededexclusive (
bool
) β Request exclusive accessarguments (
Arguments
) β A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.
- Return type
- Yields
Example UsageΒΆconsumer = self.client.consume(self.queue) async for msg in consumer: await self.client.basic_ack(msg.delivery_tag) if msg.body == b'stop': break
- async
publish
(exchange='amq.direct', routing_key='', message_body=b'', mandatory=False, app_id=None, content_encoding=None, content_type=None, correlation_id=None, delivery_mode=None, expiration=None, headers=None, message_id=None, message_type=None, priority=None, reply_to=None, timestamp=None, user_id=None)[source]ΒΆ Publish a message to RabbitMQ
message_body can either be
str
orbytes
. If it is astr
, it will be encoded to abytes
instance usingUTF-8
encoding.If publisher confirms are enabled, will return True or False indicating success or failure.
See also
Client.confirm_select()
for enabling publisher confirmation of published messages.Note
The
immediate
flag is not offered as it is not implemented in RabbitMQ as of this time. Seebasic / publish
in the βMethods from the AMQP specification, version 0-9-1β table in RabbitMQβs Compatibility and Conformance page for more information.- Parameters
exchange (
str
) β The exchange to publish to. Default: amq.directrouting_key (
str
) β The routing key to publish with. Default: ``message_body (
typing.Union
[bytes
,str
]) β The message body to publish. Default: ``mandatory (
bool
) β Indicate mandatory routing. Default: Falseapp_id (
typing.Optional
[str
]) β Creating application idcontent_type (
typing.Optional
[str
]) β MIME content typecontent_encoding (
typing.Optional
[str
]) β MIME content encodingcorrelation_id (
typing.Optional
[str
]) β Application correlation identifierdelivery_mode (
typing.Optional
[int
]) β Non-persistent (1) or persistent (2)expiration (
typing.Optional
[str
]) β Message expiration specificationheaders (typing.Optional[
FieldTable
]) β Message header field tablemessage_id (
typing.Optional
[str
]) β Application message identifiermessage_type (
typing.Optional
[str
]) β Message type namepriority (
typing.Optional
[int
]) β Message priority, 0 to 9reply_to (
typing.Optional
[str
]) β Address to reply totimestamp (
typing.Optional
[datetime.datetime
]) β Message timestampuser_id (
typing.Optional
[str
]) β Creating user idtimestamp β
- Raises
TypeError β if an argument is of the wrong data type
ValueError β if the value of one an argument does not validate
aiorabbit.exceptions.NotFound β When publisher confirms are enabled and mandatory is set and the exchange that is being published to does not exist.
- Return type
- async
qos_prefetch
(count=0, per_consumer=True)[source]ΒΆ Specify the number of messages to pre-allocate for a consumer.
This method requests a specific quality of service. It uses
Basic.QoS
under the covers, but due to the redefinition of theglobal
argument in RabbitMQ, along with the lack ofprefetch_size
, it is redefined here asqos_prefetch
and is used for the count only.The QoS can be specified for the current channel or individual consumers on the channel.
- Parameters
count β Window in messages to pre-allocate for consumers
per_consumer β Apply QoS to new consumers when
True
or to the whole channel whenFalse
.
- Return type
None
-
register_basic_return_callback
(value)[source]ΒΆ Register a callback that is invoked when RabbitMQ returns a published message. The callback can be a synchronous or asynchronous method and is invoked with the returned message as an instance of
Message
.- Parameters
value (
Callable
) β The method or function to invoke as a callback
Example UsageΒΆasync def on_return(msg: aiorabbit.message.Message) -> None: self._logger.warning('RabbitMQ Returned a message: %r', msg) client = Client(RABBITMQ_URL) client.register_basic_return_callback(on_return) await client.connect() # ... publish messages that could return
- Return type
None
- async
basic_qos
()[source]ΒΆ This method is not implemented, as RabbitMQ does not fully implement it and changes the of the semantic meaning of how it is used.
Use the
Client.qos_prefetch()
method instead as it implements theBasic.QoS
behavior as it currently works in RabbitMQ.See also
See the RabbitMQ site for more information on RabbitMQβs implementation and changes to
Basic.QoS
.- Raises
NotImplementedError β when invoked
- Return type
None
- async
basic_consume
(queue='', no_local=False, no_ack=False, exclusive=False, arguments=None, callback=None, consumer_tag=None)[source]ΒΆ Start a queue consumer
This method asks the server to start a βconsumerβ, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them.
This method is used for callback passing style usage. For each message, the
callback
method will be invoked, passing in an instance ofMessage
.The
Client.consume
method should be used for generator style consuming.- Parameters
queue (
str
) β Specifies the name of the queue to consume fromno_local (
bool
) β Do not deliver own messagesno_ack (
bool
) β No acknowledgement neededexclusive (
bool
) β Request exclusive accessarguments (
Arguments
) β A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.callback (
Callable
) β The method to invoke for each received message.consumer_tag (
typing.Optional
[str
]) β Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.
- Return type
- Returns
the consumer tag value
- async
basic_cancel
(consumer_tag='')[source]ΒΆ End a queue consumer
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the
CancelOk
reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion. Note that as it is not a MUST for clients to accept this method from the server, it is advisable for the broker to be able to identify those clients that are capable of accepting the method, through some means of capability negotiation.- Parameters
consumer_tag (
str
) β Consumer tag- Return type
None
- async
basic_get
(queue='', no_ack=False)[source]ΒΆ Direct access to a queue
This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.
- Parameters
- Return type
- async
basic_ack
(delivery_tag, multiple=False)[source]ΒΆ Acknowledge one or more messages
When sent by the client, this method acknowledges one or more messages delivered via the
Basic.Deliver
orBasic.GetOk
methods. The acknowledgement can be for a single message or a set of messages up to and including a specific message.
- async
basic_nack
(delivery_tag, multiple=False, requeue=True)[source]ΒΆ Reject one or more incoming messages
This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
- async
basic_reject
(delivery_tag, requeue=True)[source]ΒΆ Reject an incoming message
This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
- async
basic_publish
()[source]ΒΆ This method is not implemented and the more opinionated
publish()
method exists, implementing theBasic.Publish
RPC.- Raises
NotImplementedError β when invoked
- Return type
None
- async
basic_recover
(requeue=False)[source]ΒΆ Redeliver unacknowledged messages
This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered.
- Parameters
requeue (
bool
) β Requeue the message- Raises
aiorabbit.exceptions.NotImplemented β when False is specified for requeue
- Return type
None
- async
confirm_select
()[source]ΒΆ Enable Publisher Confirms
Warning
RabbitMQ will only indicate a publishing failure via publisher confirms when there is an internal error in RabbitMQ. They are not a mechanism for guaranteeing a message is routed. Usage of the
mandatory
flag when publishing will only guarantee that the message is routed into an exchange, but not that it is published into a queue.- Raises
RuntimeError β if publisher confirms are already enabled
aiorabbit.exceptions.NotImplemented β if publisher confirms are not available on the RabbitMQ server
- Return type
None
- async
exchange_declare
(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None)[source]ΒΆ Verify exchange exists, create if needed
This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.
- Parameters
exchange (
str
) β Exchange nameexchange_type (
str
) β Exchange typepassive (
bool
) β Do not create exchangedurable (
bool
) β Request a durable exchangeauto_delete (
bool
) β Auto-delete when unusedinternal (
bool
) β Create internal exchangearguments (
Arguments
) β Arguments for declaration
- Raises
TypeError β if an argument is of the wrong data type
aiorabbit.exceptions.NotFound β if the sent command is invalid due to an argument value
aiorabbit.exceptions.CommandInvalid β when an exchange type or other parameter is invalid
- Return type
None
- async
exchange_delete
(exchange='', if_unused=False)[source]ΒΆ Delete an exchange
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.
- Parameters
- Raises
ValueError β when an argument fails to validate
- Return type
None
- async
exchange_bind
(destination='', source='', routing_key='', arguments=None)[source]ΒΆ Bind exchange to an exchange.
- Parameters
- Raises
TypeError β if an argument is of the wrong data type
aiorabbit.exceptions.NotFound β if the one of the specified exchanges does not exist
- Return type
None
- async
exchange_unbind
(destination='', source='', routing_key='', arguments=None)[source]ΒΆ Unbind an exchange from an exchange.
- Parameters
- Raises
TypeError β if an argument is of the wrong data type
ValueError β if an argument value does not validate
- Return type
None
- async
queue_declare
(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]ΒΆ Declare queue, create if needed
This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
Returns a tuple of message count, consumer count.
- Parameters
- Raises
TypeError β if an argument is of the wrong data type
ValueError β when an argument fails to validate
aiorabbit.exceptions.ResourceLocked β when a queue is already declared and exclusive is requested
aiorabbit.exceptions.PreconditionFailed β when a queue is redeclared with a different definition than it currently has
- Return type
- async
queue_delete
(queue='', if_unused=False, if_empty=False)[source]ΒΆ Delete a queue
This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.
- async
queue_bind
(queue='', exchange='', routing_key='', arguments=None)[source]ΒΆ Bind queue to an exchange
This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.
- Parameters
- Raises
TypeError β if an argument is of the wrong data type
ValueError β when an argument fails to validate
- Return type
None
- async
queue_unbind
(queue='', exchange='', routing_key='', arguments=None)[source]ΒΆ Unbind a queue from an exchange
This method unbinds a queue from an exchange.
- Parameters
- Raises
TypeError β if an argument is of the wrong data type
ValueError β when an argument fails to validate
- Return type
None
- async
queue_purge
(queue='')[source]ΒΆ Purge a queue
This method removes all messages from a queue which are not awaiting acknowledgment.
- async
tx_select
()[source]ΒΆ Select standard transaction mode
This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the
tx_commit()
ortx_rollback()
methods.- Return type
None
- async
tx_commit
()[source]ΒΆ Commit the current transaction
This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit.
- Raises
aiorabbit.exceptions.NoTransactionError β when invoked prior to invoking
tx_select()
.- Return type
None
- async
tx_rollback
()[source]ΒΆ Abandon the current transaction
This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued.
- Raises
aiorabbit.exceptions.NoTransactionError β when invoked prior to invoking
tx_select()
.- Return type
None