Source code for aiorabbit
# coding: utf-8
import asyncio
import contextlib
import logging
import ssl
import typing
from aiorabbit import exceptions
from aiorabbit.__version__ import version
DEFAULT_LOCALE = 'en-US'
DEFAULT_PRODUCT = 'aiorabbit/{}'.format(version)
DEFAULT_URL = 'amqp://guest:guest@localhost'
LOGGER = logging.getLogger('aiorabbit')
[docs]@contextlib.asynccontextmanager
async def connect(url: str = DEFAULT_URL,
locale: str = DEFAULT_LOCALE,
product: str = DEFAULT_PRODUCT,
loop: typing.Optional[asyncio.AbstractEventLoop] = None,
on_return: typing.Optional[typing.Callable] = None,
ssl_context: typing.Optional[ssl.SSLContext] = None):
"""Asynchronous :ref:`context-manager <python:typecontextmanager>` that
connects to RabbitMQ, returning a connected
:class:`~aiorabbit.client.Client` as the target.
.. code-block:: python3
:caption: Example Usage
async with aiorabbit.connect(RABBITMQ_URL) as client:
await client.exchange_declare('test', 'topic')
:param url: The URL to connect to RabbitMQ with
:param locale: The locale for the connection, default `en-US`
:param product: The product name for the connection, default `aiorabbit`
:param loop: Optional :mod:`asyncio` event loop to use
:param on_return: An optional callback method to be invoked if the server
returns a published method. Can also be set using the
:meth:`~Client.register_basic_return_callback` method.
:param ssl_context: Optional :class:`ssl.SSLContext` for the connection
"""
from aiorabbit import client
rmq_client = client.Client(
url, locale, product, loop, on_return, ssl_context)
await rmq_client.connect()
try:
yield rmq_client
finally:
if not rmq_client.is_closed:
await rmq_client.close()
__all__ = [
'client',
'connect',
'DEFAULT_PRODUCT',
'DEFAULT_LOCALE',
'DEFAULT_URL',
'exceptions',
'message',
'types',
'version'
]