Source code for aio_pika.exchange

from typing import Optional, Union

import aiormq
from pamqp.common import Arguments

from .abc import (
    AbstractChannel, AbstractExchange, AbstractMessage, ExchangeParamType,
    ExchangeType, TimeoutType, get_exchange_name,
)
from .log import get_logger


log = get_logger(__name__)


[docs]class Exchange(AbstractExchange): """ Exchange abstraction """ channel: AbstractChannel def __init__( self, channel: AbstractChannel, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Arguments = None, ): self._type = type.value if isinstance(type, ExchangeType) else type self.channel = channel self.name = name self.auto_delete = auto_delete self.durable = durable self.internal = internal self.passive = passive self.arguments = arguments or {} def __str__(self) -> str: return self.name def __repr__(self) -> str: return ( f"<{self.__class__.__name__}({self}):" f" auto_delete={self.auto_delete}," f" durable={self.durable}," f" arguments={self.arguments!r})>" ) async def declare( self, timeout: TimeoutType = None, ) -> aiormq.spec.Exchange.DeclareOk: channel = await self.channel.get_underlay_channel() return await channel.exchange_declare( self.name, exchange_type=self._type, durable=self.durable, auto_delete=self.auto_delete, internal=self.internal, passive=self.passive, arguments=self.arguments, timeout=timeout, )
[docs] async def bind( self, exchange: ExchangeParamType, routing_key: str = "", *, arguments: Arguments = None, timeout: TimeoutType = None, ) -> aiormq.spec.Exchange.BindOk: """ A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange. Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we're going to call it a binding key. .. code-block:: python client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key) :param exchange: :class:`aio_pika.exchange.Exchange` instance :param routing_key: routing key :param arguments: additional arguments :param timeout: execution timeout :return: :class:`None` """ log.debug( "Binding exchange %r to exchange %r, routing_key=%r, arguments=%r", self, exchange, routing_key, arguments, ) channel = await self.channel.get_underlay_channel() return await channel.exchange_bind( arguments=arguments, destination=self.name, routing_key=routing_key, source=get_exchange_name(exchange), timeout=timeout, )
[docs] async def unbind( self, exchange: ExchangeParamType, routing_key: str = "", arguments: Arguments = None, timeout: TimeoutType = None, ) -> aiormq.spec.Exchange.UnbindOk: """ Remove exchange-to-exchange binding for this :class:`Exchange` instance :param exchange: :class:`aio_pika.exchange.Exchange` instance :param routing_key: routing key :param arguments: additional arguments :param timeout: execution timeout :return: :class:`None` """ log.debug( "Unbinding exchange %r from exchange %r, " "routing_key=%r, arguments=%r", self, exchange, routing_key, arguments, ) channel = await self.channel.get_underlay_channel() return await channel.exchange_unbind( arguments=arguments, destination=self.name, routing_key=routing_key, source=get_exchange_name(exchange), timeout=timeout, )
[docs] async def publish( self, message: AbstractMessage, routing_key: str, *, mandatory: bool = True, immediate: bool = False, timeout: TimeoutType = None, ) -> Optional[aiormq.abc.ConfirmationFrameType]: """ Publish the message to the queue. `aio-pika` uses `publisher confirms`_ extension for message delivery. .. _publisher confirms: https://www.rabbitmq.com/confirms.html """ log.debug( "Publishing message with routing key %r via exchange %r: %r", routing_key, self, message, ) if self.internal: # Caught on the client side to prevent channel closure raise ValueError( f"Can not publish to internal exchange: '{self.name}'!", ) if self.channel.is_closed: raise aiormq.exceptions.ChannelInvalidStateError( "%r closed" % self.channel, ) channel = await self.channel.get_underlay_channel() return await channel.basic_publish( exchange=self.name, routing_key=routing_key, body=message.body, properties=message.properties, mandatory=mandatory, immediate=immediate, timeout=timeout, )
[docs] async def delete( self, if_unused: bool = False, timeout: TimeoutType = None, ) -> aiormq.spec.Exchange.DeleteOk: """ Delete the queue :param timeout: operation timeout :param if_unused: perform deletion when queue has no bindings. """ log.info("Deleting %r", self) channel = await self.channel.get_underlay_channel() result = await channel.exchange_delete( self.name, if_unused=if_unused, timeout=timeout, ) del self.channel return result
__all__ = ("Exchange", "ExchangeType", "ExchangeParamType")