Callback-Based Consumer

The following example implements a callback-based consumer. This style of consumer could be useful as part of a larger application, where consuming is only one part of the main application flow. It performs similar high-level logic to the Async Generator Consumer.
 import asyncio
 import logging
 import os
 import random

 from aiorabbit import client, exceptions, message

 LOGGER = logging.getLogger(__name__)

 class Consumer:
     """Class that demonstrates a consumer application lifecycle"""

     def __init__(self, rabbitmq_url: str, queue_name: str):
         self.client = client.Client(rabbitmq_url)
         self.queue_name = queue_name
         self.shutdown = asyncio.Event()

     async def execute(self) -> None:
         """Performs the following steps:

         1. Connects to RabbitMQ and exits on authentication failure
         2. Ensures the queue to consume from exists
         3. Starts the consumer
         4. Blocks until ``self.shutdown`` is set
         5. Stops consuming
         6. Closes the connection

             await self.client.connect()
         except exceptions.AccessRefused as err:
             LOGGER.error('Failed to authenticate to RabbitMQ: %s', err)

         await self.client.queue_declare(self.queue_name)

         consumer_tag = await self.client.basic_consume(
             self.queue_name, callback=self.on_message)'Started consuming on queue %s with consumer tag %s',
                     self.queue_name, consumer_tag)

         await self.shutdown.wait()'Shutting down')

         await self.client.basic_cancel(consumer_tag)
         await self.client.close()

     async def on_message(self, msg: message.Message) -> None:
         """Receives the message from RabbitMQ and...

         - If the message body is ``stop``, ack it and set shutdown event
         - or ack with a 75% chance
         - or nack without requeue

         """'Received message published to %s: %r',
                     self.queue_name, msg.body)
         if msg.body == b'stop':
             await self.client.basic_ack(msg.delivery_tag)
         elif random.randint(1, 100) <= 75:
             await self.client.basic_ack(msg.delivery_tag)
             await self.client.basic_nack(msg.delivery_tag, requeue=False)

 async def main():
     await Consumer(os.environ.get('RABBITMQ_URL', ''), 'test-queue').execute()

 if __name__ == '__main__':

Run the code, open the RabbitMQ management UI in your browser, and publish a few messages to the queue. When you’ve sent enough, publish a message with the body of stop. You should see output similar to the following:

$ python3
INFO:aiorabbit.client:Connecting to amqp://guest:*****@localhost:32773/%2F
INFO:__main__:Started consuming on queue test-queue with consumer tag amq.ctag-4DSHNNZGxrf22bxS_i0uqA
INFO:__main__:Received message published to test-queue: b'example #1'
INFO:__main__:Received message published to test-queue: b'example #2'
INFO:__main__:Received message published to test-queue: b'stop'
INFO:__main__:Shutting down