Source code for aiorabbit.message

# coding: utf-8
import datetime
import typing

from pamqp import body, commands, header

METHODS = typing.Union[commands.Basic.Deliver,
                       commands.Basic.GetOk,
                       commands.Basic.Return]


[docs]class Message: """Represents a message received from RabbitMQ :param method: **For internal use only** """ def __init__(self, method: METHODS) -> None: self.method = method self.header: typing.Optional[header.ContentHeader] = None self.body_frames: typing.List[body.ContentBody] = []
[docs] def __bytes__(self) -> bytes: """Return the message body if the instance is accessed using ``bytes(Message)`` """ return self.body
[docs] def __len__(self) -> int: """Return the length of the message body""" return len(self.body)
@property def consumer_tag(self) -> typing.Optional[str]: """If the delivered via ``Basic.Deliver``, this provides the consumer tag it was delivered to. """ if isinstance(self.method, commands.Basic.Deliver): return self.method.consumer_tag @property def delivery_tag(self) -> typing.Optional[int]: """If the message was delivered via ``Basic.Deliver``, or retrieved via ``Basic.Get``, this will indicate the delivery tag value, which is used when acknowledging, negative-acknowledging, or rejecting the message. """ if isinstance(self.method, (commands.Basic.Deliver, commands.Basic.GetOk)): return self.method.delivery_tag @property def exchange(self) -> str: """Provides the exchange the message was published to""" return self.method.exchange @property def routing_key(self) -> str: """Provides the routing key the message was published with""" return self.method.routing_key @property def message_count(self) -> typing.Optional[int]: """Provides the ``message_count`` if the message was retrieved using ``Basic.Get`` """ if isinstance(self.method, commands.Basic.GetOk): return self.method.message_count @property def redelivered(self) -> typing.Optional[bool]: """Indicates if the message was redelivered. Will return ``None`` if the message was returned via ``Basic.Return``. """ if isinstance(self.method, (commands.Basic.Deliver, commands.Basic.GetOk)): return self.method.redelivered @property def reply_code(self) -> typing.Optional[int]: """If the message was returned via ``Basic.Return``, indicates the the error code from RabbitMQ. """ if isinstance(self.method, commands.Basic.Return): return self.method.reply_code @property def reply_text(self) -> typing.Optional[str]: """If the message was returned via ``Basic.Return``, indicates the reason why. """ if isinstance(self.method, commands.Basic.Return): return self.method.reply_text @property def app_id(self) -> typing.Optional[str]: """Provides the ``app_id`` property value if it is set.""" return self.header.properties.app_id @property def content_encoding(self) -> typing.Optional[str]: """Provides the ``content_encoding`` property value if it is set.""" return self.header.properties.content_encoding @property def content_type(self) -> typing.Optional[str]: """Provides the ``content_type`` property value if it is set.""" return self.header.properties.content_type @property def correlation_id(self) -> typing.Optional[str]: """Provides the ``correlation_id`` property value if it is set.""" return self.header.properties.correlation_id @property def delivery_mode(self) -> typing.Optional[int]: """Provides the ``delivery_mode`` property value if it is set.""" return self.header.properties.delivery_mode @property def expiration(self) -> typing.Optional[str]: """Provides the ``expiration`` property value if it is set.""" return self.header.properties.expiration @property def headers(self) -> typing.Optional[typing.Dict[str, typing.Any]]: """Provides the ``headers`` property value if it is set.""" return self.header.properties.headers @property def message_id(self) -> typing.Optional[str]: """Provides the ``message_id`` property value if it is set.""" return self.header.properties.message_id @property def message_type(self) -> typing.Optional[str]: """Provides the ``message_type`` property value if it is set.""" return self.header.properties.message_type @property def priority(self) -> typing.Optional[int]: """Provides the ``priority`` property value if it is set.""" return self.header.properties.priority @property def reply_to(self) -> typing.Optional[str]: """Provides the ``reply_to`` property value if it is set.""" return self.header.properties.reply_to @property def timestamp(self) -> typing.Optional[datetime.datetime]: """Provides the ``timestamp`` property value if it is set.""" return self.header.properties.timestamp @property def user_id(self) -> typing.Optional[str]: """Provides the ``user_id`` property value if it is set.""" return self.header.properties.user_id @property def body(self) -> bytes: """Provides the message body""" return b''.join([b.value for b in self.body_frames]) @property def is_complete(self): # Used when receiving frames from RabbitMQ return len(self) == self.header.body_size