Source code for aio_pika.connection

import asyncio
from ssl import SSLContext
from types import TracebackType
from typing import (
    Any, Awaitable, Dict, Literal, Optional, Tuple, Type, TypeVar, Union
)


import aiormq.abc
from aiormq.connection import parse_int
from pamqp.common import FieldTable
from yarl import URL

from .abc import (
    AbstractChannel, AbstractConnection, ConnectionParameter, SSLOptions,
    TimeoutType, UnderlayConnection,
)
from .channel import Channel
from .exceptions import ConnectionClosed
from .log import get_logger
from .tools import CallbackCollection


log = get_logger(__name__)
T = TypeVar("T")


[docs]class Connection(AbstractConnection): """ Connection abstraction """ CHANNEL_CLASS: Type[Channel] = Channel PARAMETERS: Tuple[ConnectionParameter, ...] = ( ConnectionParameter( name="interleave", parser=parse_int, is_kwarg=True, ), ConnectionParameter( name="happy_eyeballs_delay", parser=float, is_kwarg=True, ), ) _closed: asyncio.Future @property def is_closed(self) -> bool: return self._closed.done() async def close( self, exc: Optional[aiormq.abc.ExceptionType] = ConnectionClosed, ) -> None: transport, self.transport = self.transport, None self._close_called = True if not transport: return await transport.close(exc) if not self._closed.done(): self._closed.set_result(True) def closed(self) -> Awaitable[Literal[True]]: return self._closed @classmethod def _parse_parameters(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]: result = {} for parameter in cls.PARAMETERS: value = kwargs.get(parameter.name, parameter.default) if parameter.is_kwarg and value is None: # skip optional value continue result[parameter.name] = parameter.parse(value) return result def __init__( self, url: URL, loop: Optional[asyncio.AbstractEventLoop] = None, ssl_context: Optional[SSLContext] = None, **kwargs: Any, ): self.loop = loop or asyncio.get_event_loop() self.transport = None self._closed = self.loop.create_future() self._close_called = False self.url = URL(url) self.kwargs: Dict[str, Any] = self._parse_parameters( kwargs or dict(self.url.query), ) self.kwargs["context"] = ssl_context self.close_callbacks = CallbackCollection(self) self.connected: asyncio.Event = asyncio.Event() def __str__(self) -> str: url = self.url if url.password: url = url.with_password("******") return str(url) def __repr__(self) -> str: return f'<{self.__class__.__name__}: "{self}">' async def _on_connection_close(self, closing: asyncio.Future) -> None: try: exc = closing.exception() except asyncio.CancelledError as e: exc = e self.connected.clear() await self.close_callbacks(exc) async def _on_connected(self) -> None: self.connected.set()
[docs] async def connect(self, timeout: TimeoutType = None) -> None: """ Connect to AMQP server. This method should be called after :func:`aio_pika.connection.Connection.__init__` .. note:: This method is called by :func:`connect`. You shouldn't call it explicitly. """ self.transport = await UnderlayConnection.connect( self.url, self._on_connection_close, timeout=timeout, **self.kwargs, ) await self._on_connected()
[docs] def channel( self, channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False, ) -> AbstractChannel: """ Coroutine which returns new instance of :class:`Channel`. Example: .. code-block:: python import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = await connection.channel( publisher_confirms=False ) await channel_no_confirms.close() Also available as an asynchronous context manager: .. code-block:: python import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed :param channel_number: specify the channel number explicit :param publisher_confirms: if `True` the :func:`aio_pika.Exchange.publish` method will be return :class:`bool` after publish is complete. Otherwise the :func:`aio_pika.Exchange.publish` method will be return :class:`None` :param on_return_raises: raise an :class:`aio_pika.exceptions.DeliveryError` when mandatory message will be returned """ if not self.transport: raise RuntimeError("Connection was not opened") log.debug("Creating AMQP channel for connection: %r", self) channel = self.CHANNEL_CLASS( connection=self, channel_number=channel_number, publisher_confirms=publisher_confirms, on_return_raises=on_return_raises, ) log.debug("Channel created: %r", channel) return channel
async def ready(self) -> None: await self.connected.wait() def __del__(self) -> None: if ( self.is_closed or self.loop.is_closed() ): return asyncio.ensure_future(self.close()) async def __aenter__(self) -> "Connection": return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: await self.close() async def update_secret( self, new_secret: str, *, reason: str = "", timeout: TimeoutType = None, ) -> aiormq.spec.Connection.UpdateSecretOk: if self.transport is None: raise RuntimeError("Connection is not ready") result = await self.transport.connection.update_secret( new_secret=new_secret, reason=reason, timeout=timeout, ) self.url = self.url.with_password(new_secret) return result
def make_url( url: Union[str, URL, None] = None, *, host: str = "localhost", port: int = 5672, login: str = "guest", password: str = "guest", virtualhost: str = "/", ssl: bool = False, ssl_options: Optional[SSLOptions] = None, client_properties: Optional[FieldTable] = None, **kwargs: Any, ) -> URL: if url is not None: if not isinstance(url, URL): return URL(url) return url kw = kwargs kw.update(ssl_options or {}) kw.update(client_properties or {}) # sanitize keywords kw = {k: v for k, v in kw.items() if v is not None} return URL.build( scheme="amqps" if ssl else "amqp", host=host, port=port, user=login, password=password, # yarl >= 1.3.0 requires path beginning with slash path="/" + virtualhost, query=kw, )
[docs]async def connect( url: Union[str, URL, None] = None, *, host: str = "localhost", port: int = 5672, login: str = "guest", password: str = "guest", virtualhost: str = "/", ssl: bool = False, loop: Optional[asyncio.AbstractEventLoop] = None, ssl_options: Optional[SSLOptions] = None, ssl_context: Optional[SSLContext] = None, timeout: TimeoutType = None, client_properties: Optional[FieldTable] = None, connection_class: Type[AbstractConnection] = Connection, **kwargs: Any, ) -> AbstractConnection: """ Make connection to the broker. Example: .. code-block:: python import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) Connect to localhost with default credentials: .. code-block:: python import aio_pika async def main(): connection = await aio_pika.connect() .. note:: The available keys for ssl_options parameter are: * cert_reqs * certfile * keyfile * ssl_version For an information on what the ssl_options can be set to reference the `official Python documentation`_ . Set connection name for RabbitMQ admin panel: .. code-block:: python # As URL parameter method read_connection = await connect( "amqp://guest:guest@localhost/?name=Read%20connection" ) write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } ) .. note: ``client_properties`` argument requires ``aiormq>=2.9`` URL string might be containing ssl parameters e.g. `amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem` :param client_properties: add custom client capability. :param url: RFC3986_ formatted broker address. When :class:`None` will be used keyword arguments. :param host: hostname of the broker :param port: broker port 5672 by default :param login: username string. `'guest'` by default. :param password: password string. `'guest'` by default. :param virtualhost: virtualhost parameter. `'/'` by default :param ssl: use SSL for connection. Should be used with addition kwargs. :param ssl_options: A dict of values for the SSL connection. :param timeout: connection timeout in seconds :param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`) :param ssl_context: ssl.SSLContext instance :param connection_class: Factory of a new connection :param kwargs: addition parameters which will be passed to the connection. :return: :class:`aio_pika.connection.Connection` .. _RFC3986: https://goo.gl/MzgYAs .. _official Python documentation: https://goo.gl/pty9xA """ connection: AbstractConnection = connection_class( make_url( url, host=host, port=port, login=login, password=password, virtualhost=virtualhost, ssl=ssl, ssl_options=ssl_options, client_properties=client_properties, **kwargs, ), loop=loop, ssl_context=ssl_context, **kwargs, ) await connection.connect(timeout=timeout) return connection
__all__ = ("Connection", "connect", "make_url")