API Reference¶
- aio_pika.AMQPException¶
alias of
AMQPError
- class aio_pika.Channel(connection: AbstractConnection, channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False)[source]¶
Channel abstraction
- Parameters:
connection –
aio_pika.adapter.AsyncioConnection
instanceloop – Event loop (
asyncio.get_event_loop()
whenNone
)future_store –
aio_pika.common.FutureStore
instancepublisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
- async declare_exchange(name: str, type: ExchangeType | str = ExchangeType.DIRECT, *, durable: bool = False, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) AbstractExchange [source]¶
Declare an exchange.
- Parameters:
name – string with exchange name or
aio_pika.exchange.Exchange
instancetype – Exchange type. Enum ExchangeType value or string. String values must be one of ‘fanout’, ‘direct’, ‘topic’, ‘headers’, ‘x-delayed-message’, ‘x-consistent-hash’.
durable – Durability (exchange survive broker restart)
auto_delete – Delete queue when channel will be closed.
internal – Do not send it to broker just create an object
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when exchange doesn’t exist.arguments – additional arguments
timeout – execution timeout
- Returns:
aio_pika.exchange.Exchange
instance
- async declare_queue(name: str | None = None, *, durable: bool = False, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) AbstractQueue [source]¶
- Parameters:
name – queue name
durable – Durability (queue survive broker restart)
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when queue doesn’t exist.auto_delete – Delete queue when channel will be closed.
arguments – additional arguments
timeout – execution timeout
- Returns:
aio_pika.queue.Queue
instance- Raises:
aio_pika.exceptions.ChannelClosed
instance
- async get_exchange(name: str, *, ensure: bool = True) AbstractExchange [source]¶
With
ensure=True
, it’s a shortcut for.declare_exchange(..., passive=True)
; otherwise, it returns an exchange instance without checking its existence.When the exchange does not exist, if
ensure=True
, will raiseaio_pika.exceptions.ChannelClosed
.Use this method in a separate channel (or as soon as channel created). This is only a way to get an exchange without declaring a new one.
- Parameters:
name – exchange name
ensure – ensure that the exchange exists
- Returns:
aio_pika.exchange.Exchange
instance- Raises:
aio_pika.exceptions.ChannelClosed
instance
- async get_queue(name: str, *, ensure: bool = True) AbstractQueue [source]¶
With
ensure=True
, it’s a shortcut for.declare_queue(..., passive=True)
; otherwise, it returns a queue instance without checking its existence.When the queue does not exist, if
ensure=True
, will raiseaio_pika.exceptions.ChannelClosed
.Use this method in a separate channel (or as soon as channel created). This is only a way to get a queue without declaring a new one.
- Parameters:
name – queue name
ensure – ensure that the queue exists
- Returns:
aio_pika.queue.Queue
instance- Raises:
aio_pika.exceptions.ChannelClosed
instance
- property is_closed: bool¶
Returns True when the channel has been closed from the broker side or after the close() method has been called.
- property is_initialized: bool¶
Returns True when the channel has been opened and ready for interaction
- class aio_pika.Connection(url: URL, loop: AbstractEventLoop | None = None, ssl_context: SSLContext | None = None, **kwargs: Any)[source]¶
Connection abstraction
- channel(channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False) AbstractChannel [source]¶
Coroutine which returns new instance of
Channel
.Example:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = await connection.channel( publisher_confirms=False ) await channel_no_confirms.close()
Also available as an asynchronous context manager:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed
- Parameters:
channel_number – specify the channel number explicit
publisher_confirms – if True the
aio_pika.Exchange.publish()
method will be returnbool
after publish is complete. Otherwise theaio_pika.Exchange.publish()
method will be returnNone
on_return_raises – raise an
aio_pika.exceptions.DeliveryError
when mandatory message will be returned
- class aio_pika.DeliveryMode(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
- class aio_pika.Exchange(channel: AbstractChannel, name: str, type: ExchangeType | str = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, FieldValue] | None = None)[source]¶
Exchange abstraction
- async bind(exchange: AbstractExchange | str, routing_key: str = '', *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) BindOk [source]¶
A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key)
- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns:
None
- async delete(if_unused: bool = False, timeout: float | int | None = None) DeleteOk [source]¶
Delete the queue
- Parameters:
timeout – operation timeout
if_unused – perform deletion when queue has no bindings.
- async publish(message: AbstractMessage, routing_key: str, *, mandatory: bool = True, immediate: bool = False, timeout: float | int | None = None) Ack | Nack | Reject | None [source]¶
Publish the message to the queue. aio-pika uses publisher confirms extension for message delivery.
- async unbind(exchange: AbstractExchange | str, routing_key: str = '', arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk [source]¶
Remove exchange-to-exchange binding for this
Exchange
instance- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns:
None
- class aio_pika.ExchangeType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
- class aio_pika.IncomingMessage(message: DeliveredMessage, no_ack: bool = False)[source]¶
Incoming message is seems like Message but has additional methods for message acknowledgement.
Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit (“manual”) client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:
basic.ack is used for positive acknowledgements
basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)
basic.reject is used for negative acknowledgements but has one limitation compared to basic.nack
Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn’t processed but still should be deleted.
Create an instance of
IncomingMessage
- async ack(multiple: bool = False) None [source]¶
Send basic.ack is used for positive acknowledgements
Note
This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn’t require any responses from the broker.
- Parameters:
multiple – If set to True, the message’s delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the ack refers to a single message.
- Returns:
None
- process(requeue: bool = False, reject_on_redelivered: bool = False, ignore_processed: bool = False) AbstractProcessContext [source]¶
Context manager for processing the message
>>> async def on_message_received(message: IncomingMessage): ... async with message.process(): ... # When exception will be raised ... # the message will be rejected ... print(message.body)
Example with ignore_processed=True
>>> async def on_message_received(message: IncomingMessage): ... async with message.process(ignore_processed=True): ... # Now (with ignore_processed=True) you may reject ... # (or ack) message manually too ... if True: # some reasonable condition here ... await message.reject() ... print(message.body)
- Parameters:
requeue – Requeue message when exception.
reject_on_redelivered – When True message will be rejected only when message was redelivered.
ignore_processed – Do nothing if message already processed
- async reject(requeue: bool = False) None [source]¶
When requeue=True the message will be returned to queue. Otherwise, message will be dropped.
Note
This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn’t require any responses from the broker.
- Parameters:
requeue – bool
- class aio_pika.Message(body: bytes, *, headers: Dict[str, bool | bytes | bytearray | Decimal | List[bool | bytes | bytearray | Decimal | List[FieldValue] | Dict[str, FieldValue] | float | int | None | str | datetime] | Dict[str, bool | bytes | bytearray | Decimal | List[FieldValue] | Dict[str, FieldValue] | float | int | None | str | datetime] | float | int | None | str | datetime] | None = None, content_type: str | None = None, content_encoding: str | None = None, delivery_mode: DeliveryMode | int | None = None, priority: int | None = None, correlation_id: str | None = None, reply_to: str | None = None, expiration: int | datetime | float | timedelta | None = None, message_id: str | None = None, timestamp: int | datetime | float | timedelta | None = None, type: str | None = None, user_id: str | None = None, app_id: str | None = None)[source]¶
AMQP message abstraction
Creates a new instance of Message
- Parameters:
body – message body
headers – message headers
content_type – content type
content_encoding – content encoding
delivery_mode – delivery mode
priority – priority
correlation_id – correlation id
reply_to – reply to
expiration – expiration in seconds (or datetime or timedelta)
message_id – message id
timestamp – timestamp
type – type
user_id – user id
app_id – app id
- property locked: bool¶
is message locked
- Returns:
bool
- property properties: Properties¶
Build
aiormq.spec.Basic.Properties
object
- class aio_pika.Queue(channel: AbstractChannel, name: str | None, durable: bool, exclusive: bool, auto_delete: bool, arguments: Dict[str, FieldValue] | None, passive: bool = False)[source]¶
AMQP queue abstraction
- async bind(exchange: AbstractExchange | str, routing_key: str | None = None, *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) BindOk [source]¶
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises:
asyncio.TimeoutError – when the binding timeout period has elapsed.
- Returns:
None
- async cancel(consumer_tag: str, timeout: float | int | None = None, nowait: bool = False) CancelOk [source]¶
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.
- Parameters:
consumer_tag – consumer tag returned by
consume()
timeout – execution timeout
nowait (bool) – Do not expect a Basic.CancelOk response
- Returns:
Basic.CancelOk when operation completed successfully
- async consume(callback: Callable[[AbstractIncomingMessage], Awaitable[Any]], no_ack: bool = False, exclusive: bool = False, arguments: Dict[str, FieldValue] | None = None, consumer_tag: str | None = None, timeout: float | int | None = None) str [source]¶
Start to consuming the
Queue
.- Parameters:
timeout –
asyncio.TimeoutError
will be raises when the Future was not finished after this time.callback – Consuming callback. Should be a coroutine function.
no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
arguments – additional arguments
consumer_tag – optional consumer tag
- Raises:
asyncio.TimeoutError – when the consuming timeout period has elapsed.
- Return str:
consumer tag
str
- async declare(timeout: float | int | None = None) DeclareOk [source]¶
Declare queue.
- Parameters:
timeout – execution timeout
- Returns:
None
- async delete(*, if_unused: bool = True, if_empty: bool = True, timeout: float | int | None = None) DeleteOk [source]¶
Delete the queue.
- Parameters:
if_unused – Perform delete only when unused
if_empty – Perform delete only when empty
timeout – execution timeout
- Returns:
None
- async get(*, no_ack: bool = False, fail: Literal[True] = True, timeout: TimeoutType = 5) IncomingMessage [source]¶
- async get(*, no_ack: bool = False, fail: Literal[False] = True, timeout: TimeoutType = 5) IncomingMessage | None
Get message from the queue.
- Parameters:
no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
timeout – execution timeout
fail – Should return
None
instead of raise an exceptionaio_pika.exceptions.QueueEmpty
.
- Returns:
- iterator(**kwargs: Any) AbstractQueueIterator [source]¶
Returns an iterator for async for expression.
Full example:
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async with queue.iterator() as q: async for message in q: print(message.body)
When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the “async for” expression directly.
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async for message in queue: print(message.body)
- Returns:
QueueIterator
- async purge(no_wait: bool = False, timeout: float | int | None = None) PurgeOk [source]¶
Purge all messages from the queue.
- Parameters:
no_wait – no wait response
timeout – execution timeout
- Returns:
None
- async unbind(exchange: AbstractExchange | str, routing_key: str | None = None, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk [source]¶
Remove binding from exchange for this
Queue
instance- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises:
asyncio.TimeoutError – when the unbinding timeout period has elapsed.
- Returns:
None
- class aio_pika.RobustChannel(connection: AbstractConnection, channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False)[source]¶
Channel abstraction
- Parameters:
connection –
aio_pika.adapter.AsyncioConnection
instanceloop – Event loop (
asyncio.get_event_loop()
whenNone
)future_store –
aio_pika.common.FutureStore
instancepublisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
- EXCHANGE_CLASS¶
alias of
RobustExchange
- QUEUE_CLASS¶
alias of
RobustQueue
- async declare_exchange(name: str, type: ExchangeType | str = ExchangeType.DIRECT, durable: bool = False, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, Any] | None = None, timeout: float | int | None = None, robust: bool = True) AbstractRobustExchange [source]¶
Declare an exchange.
- Parameters:
name – string with exchange name or
aio_pika.exchange.Exchange
instancetype – Exchange type. Enum ExchangeType value or string. String values must be one of ‘fanout’, ‘direct’, ‘topic’, ‘headers’, ‘x-delayed-message’, ‘x-consistent-hash’.
durable – Durability (exchange survive broker restart)
auto_delete – Delete queue when channel will be closed.
internal – Do not send it to broker just create an object
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when exchange doesn’t exist.arguments – additional arguments
timeout – execution timeout
- Returns:
aio_pika.exchange.Exchange
instance
- async declare_queue(name: str | None = None, *, durable: bool = False, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: Dict[str, Any] | None = None, timeout: float | int | None = None, robust: bool = True) AbstractRobustQueue [source]¶
- Parameters:
name – queue name
durable – Durability (queue survive broker restart)
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when queue doesn’t exist.auto_delete – Delete queue when channel will be closed.
arguments – additional arguments
timeout – execution timeout
- Returns:
aio_pika.queue.Queue
instance- Raises:
aio_pika.exceptions.ChannelClosed
instance
- class aio_pika.RobustConnection(url: URL, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]¶
Robust connection
- CHANNEL_CLASS¶
alias of
RobustChannel
- channel(channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False) AbstractRobustChannel [source]¶
Coroutine which returns new instance of
Channel
.Example:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = await connection.channel( publisher_confirms=False ) await channel_no_confirms.close()
Also available as an asynchronous context manager:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:[email protected]/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed
- Parameters:
channel_number – specify the channel number explicit
publisher_confirms – if True the
aio_pika.Exchange.publish()
method will be returnbool
after publish is complete. Otherwise theaio_pika.Exchange.publish()
method will be returnNone
on_return_raises – raise an
aio_pika.exceptions.DeliveryError
when mandatory message will be returned
- class aio_pika.RobustExchange(channel: AbstractChannel, name: str, type: ExchangeType | str = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, FieldValue] | None = None)[source]¶
Exchange abstraction
- async bind(exchange: AbstractExchange | str, routing_key: str = '', *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None, robust: bool = True) BindOk [source]¶
A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key)
- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns:
None
- async unbind(exchange: AbstractExchange | str, routing_key: str = '', arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk [source]¶
Remove exchange-to-exchange binding for this
Exchange
instance- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns:
None
- class aio_pika.RobustQueue(channel: AbstractChannel, name: str | None, durable: bool = False, exclusive: bool = False, auto_delete: bool = False, arguments: Dict[str, FieldValue] | None = None, passive: bool = False)[source]¶
- async bind(exchange: AbstractExchange | str, routing_key: str | None = None, *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None, robust: bool = True) BindOk [source]¶
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises:
asyncio.TimeoutError – when the binding timeout period has elapsed.
- Returns:
None
- async cancel(consumer_tag: str, timeout: float | int | None = None, nowait: bool = False) CancelOk [source]¶
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.
- Parameters:
consumer_tag – consumer tag returned by
consume()
timeout – execution timeout
nowait (bool) – Do not expect a Basic.CancelOk response
- Returns:
Basic.CancelOk when operation completed successfully
- async consume(callback: Callable[[AbstractIncomingMessage], Awaitable[Any]], no_ack: bool = False, exclusive: bool = False, arguments: Dict[str, FieldValue] | None = None, consumer_tag: str | None = None, timeout: float | int | None = None, robust: bool = True) str [source]¶
Start to consuming the
Queue
.- Parameters:
timeout –
asyncio.TimeoutError
will be raises when the Future was not finished after this time.callback – Consuming callback. Should be a coroutine function.
no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
arguments – additional arguments
consumer_tag – optional consumer tag
- Raises:
asyncio.TimeoutError – when the consuming timeout period has elapsed.
- Return str:
consumer tag
str
- iterator(**kwargs: Any) AbstractQueueIterator [source]¶
Returns an iterator for async for expression.
Full example:
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async with queue.iterator() as q: async for message in q: print(message.body)
When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the “async for” expression directly.
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async for message in queue: print(message.body)
- Returns:
QueueIterator
- async unbind(exchange: AbstractExchange | str, routing_key: str | None = None, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk [source]¶
Remove binding from exchange for this
Queue
instance- Parameters:
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises:
asyncio.TimeoutError – when the unbinding timeout period has elapsed.
- Returns:
None
- async aio_pika.connect(url: str | ~yarl.URL | None = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: ~asyncio.events.AbstractEventLoop | None = None, ssl_options: ~aio_pika.abc.SSLOptions | None = None, ssl_context: ~ssl.SSLContext | None = None, timeout: float | int | None = None, client_properties: ~typing.Dict[str, FieldValue] | None = None, connection_class: ~typing.Type[~aio_pika.abc.AbstractConnection] = <class 'aio_pika.connection.Connection'>, **kwargs: ~typing.Any) AbstractConnection [source]¶
Make connection to the broker.
Example:
import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:[email protected]/" )
Connect to localhost with default credentials:
import aio_pika async def main(): connection = await aio_pika.connect()
Note
- The available keys for ssl_options parameter are:
cert_reqs
certfile
keyfile
ssl_version
For an information on what the ssl_options can be set to reference the official Python documentation .
Set connection name for RabbitMQ admin panel:
# As URL parameter method read_connection = await connect( "amqp://guest:guest@localhost/?name=Read%20connection" ) write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } )
URL string might be containing ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem
- Parameters:
client_properties – add custom client capability.
url – RFC3986 formatted broker address. When
None
will be used keyword arguments.host – hostname of the broker
port – broker port 5672 by default
login – username string. ‘guest’ by default.
password – password string. ‘guest’ by default.
virtualhost – virtualhost parameter. ‘/’ by default
ssl – use SSL for connection. Should be used with addition kwargs.
ssl_options – A dict of values for the SSL connection.
timeout – connection timeout in seconds
loop – Event loop (
asyncio.get_event_loop()
whenNone
)ssl_context – ssl.SSLContext instance
connection_class – Factory of a new connection
kwargs – addition parameters which will be passed to the connection.
- Returns:
- async aio_pika.connect_robust(url: str | ~yarl.URL | None = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: ~asyncio.events.AbstractEventLoop | None = None, ssl_options: ~aio_pika.abc.SSLOptions | None = None, ssl_context: ~ssl.SSLContext | None = None, timeout: float | int | None = None, client_properties: ~typing.Dict[str, FieldValue] | None = None, connection_class: ~typing.Type[~aio_pika.abc.AbstractRobustConnection] = <class 'aio_pika.robust_connection.RobustConnection'>, **kwargs: ~typing.Any) AbstractRobustConnection [source]¶
Make connection to the broker.
Example:
import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:[email protected]/" )
Connect to localhost with default credentials:
import aio_pika async def main(): connection = await aio_pika.connect()
Note
- The available keys for ssl_options parameter are:
cert_reqs
certfile
keyfile
ssl_version
For an information on what the ssl_options can be set to reference the official Python documentation .
Set connection name for RabbitMQ admin panel:
# As URL parameter method read_connection = await connect( "amqp://guest:guest@localhost/?name=Read%20connection" ) # keyword method write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } )
URL string might contain ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem
- Parameters:
client_properties – add custom client capability.
url – RFC3986 formatted broker address. When
None
will be used keyword arguments.host – hostname of the broker
port – broker port 5672 by default
login – username string. ‘guest’ by default.
password – password string. ‘guest’ by default.
virtualhost – virtualhost parameter. ‘/’ by default
ssl – use SSL for connection. Should be used with addition kwargs.
ssl_options – A dict of values for the SSL connection.
timeout – connection timeout in seconds
loop – Event loop (
asyncio.get_event_loop()
whenNone
)ssl_context – ssl.SSLContext instance
connection_class – Factory of a new connection
kwargs – addition parameters which will be passed to the connection.
- Returns:
- aio_pika.patterns.base¶
alias of <module ‘aio_pika.patterns.base’ from ‘/home/runner/work/aio-pika/aio-pika/aio_pika/patterns/base.py’>
- class aio_pika.patterns.Master(channel: AbstractChannel, requeue: bool = True, reject_on_redelivered: bool = False)[source]¶
Implements Master/Worker pattern. Usage example:
worker.py
master = Master(channel) worker = await master.create_worker('test_worker', lambda x: print(x))
master.py
master = Master(channel) await master.proxy.test_worker('foo')
Creates a new
Master
instance.- Parameters:
channel – Initialized instance of
aio_pika.Channel
- async create_task(channel_name: str, kwargs: Mapping[str, Any] = mappingproxy({}), **message_kwargs: Any) Ack | Nack | Reject | None [source]¶
Creates a new task for the worker
- async create_worker(queue_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Worker [source]¶
Creates a new
Worker
instance.
- class aio_pika.patterns.Worker(queue: AbstractQueue, consumer_tag: str, loop: AbstractEventLoop)[source]¶
- class aio_pika.patterns.RPC(channel: AbstractChannel, host_exceptions: bool = False)[source]¶
Remote Procedure Call helper.
Create an instance
rpc = await RPC.create(channel, host_exceptions=False)
Registering python function
# RPC instance passes only keyword arguments def multiply(*, x, y): return x * y await rpc.register("multiply", multiply)
Call function through proxy
assert await rpc.proxy.multiply(x=2, y=3) == 6
Call function explicit
assert await rpc.call('multiply', dict(x=2, y=3)) == 6
Show exceptions on remote side
rpc = await RPC.create(channel, host_exceptions=True)
- async call(method_name: str, kwargs: Dict[str, Any] | None = None, *, expiration: int | None = None, priority: int = 5, delivery_mode: DeliveryMode = DeliveryMode.NOT_PERSISTENT) Any [source]¶
Call remote method and awaiting result.
- Parameters:
method_name – Name of method
kwargs – Methos kwargs
expiration – If not None messages which staying in queue longer will be returned and
asyncio.TimeoutError
will be raised.priority – Message priority
delivery_mode – Call message delivery mode
- Raises:
asyncio.TimeoutError – when message expired
CancelledError – when called
RPC.cancel()
RuntimeError – internal error
- async classmethod create(channel: AbstractChannel, **kwargs: Any) RPC [source]¶
Creates a new instance of
aio_pika.patterns.RPC
. You should use this method instead of__init__()
, becausecreate()
returns coroutine and makes async initialize- Parameters:
channel – initialized instance of
aio_pika.Channel
- Returns:
- async execute(func: Callable[[...], Awaitable[T]], payload: Dict[str, Any]) T [source]¶
Executes rpc call. Might be overlapped.
- async register(method_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Any [source]¶
Method creates a queue with name which equal of method_name argument. Then subscribes this queue.
- Parameters:
method_name – Method name
func – target function. Function MUST accept only keyword arguments.
kwargs – arguments which will be passed to queue_declare
- Raises:
RuntimeError – Function already registered in this
RPC
instance or method_name already used.