Quick start¶
Some useful examples.
Simple consumer¶
import asyncio
import logging
import aio_pika
async def main() -> None:
logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos(prefetch_count=10)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
asyncio.run(main())
Simple publisher¶
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body=f"Hello {routing_key}".encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
asyncio.run(main())
Asynchronous message processing¶
import asyncio
import aio_pika
async def process_message(
message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
async with message.process():
print(message.body)
await asyncio.sleep(1)
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
queue_name = "test_queue"
# Creating channel
channel = await connection.channel()
# Maximum message count which will be processing at the same time.
await channel.set_qos(prefetch_count=100)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
await queue.consume(process_message)
try:
# Wait until terminate
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Working with RabbitMQ transactions¶
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
async with connection:
routing_key = "test_queue"
# Transactions conflicts with `publisher_confirms`
channel = await connection.channel(publisher_confirms=False)
# Use transactions with async context manager
async with channel.transaction():
# Publishing messages but delivery will not be done
# before committing this transaction
for i in range(10):
message = aio_pika.Message(body="Hello #{}".format(i).encode())
await channel.default_exchange.publish(
message, routing_key=routing_key,
)
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
await tx.commit()
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Should be rejected".encode()),
routing_key=routing_key,
)
await tx.rollback()
if __name__ == "__main__":
asyncio.run(main())
Get single message example¶
import asyncio
from typing import Optional
from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
connection = await connect_robust(
"amqp://guest:[email protected]/?name=aio-pika%20example",
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange("direct", auto_delete=True)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes("Hello", "utf-8"),
content_type="text/plain",
headers={"foo": "bar"},
),
routing_key,
)
# Receiving one message
incoming_message: Optional[AbstractIncomingMessage] = await queue.get(
timeout=5, fail=False
)
if incoming_message:
# Confirm message
await incoming_message.ack()
else:
print("Queue empty")
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Set logging level¶
Sometimes you want to see only your debug logs, but when you just call logging.basicConfig(logging.DEBUG) you set the debug log level for all loggers, includes all aio_pika’s modules. If you want to set logging level independently see following example:
import logging
from aio_pika import logger
logger.setLevel(logging.ERROR)
Tornado example¶
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import Message, connect_robust
class Base:
QUEUE: asyncio.Queue
class SubscriberHandler(tornado.web.RequestHandler, Base):
async def get(self) -> None:
message = await self.QUEUE.get()
await self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self) -> None:
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body), routing_key="test",
)
finally:
await channel.close()
await self.finish("OK")
async def make_app() -> tornado.web.Application:
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
Base.QUEUE = asyncio.Queue()
await queue.consume(Base.QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
async def main() -> None:
app = await make_app()
app.listen(8888)
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
External credentials example¶
import asyncio
import ssl
import aio_pika
from aio_pika.abc import SSLOptions
async def main() -> None:
connection = await aio_pika.connect_robust(
host="127.0.0.1",
login="",
ssl=True,
ssl_options=SSLOptions(
cafile="cacert.pem",
certfile="cert.pem",
keyfile="key.pem",
no_verify_ssl=ssl.CERT_REQUIRED,
),
client_properties={"connection_name": "aio-pika external credentials"},
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
asyncio.run(main())
Connection pooling¶
import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool
async def main() -> None:
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
connection_pool: Pool = Pool(get_connection, max_size=2)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool: Pool = Pool(get_channel, max_size=10)
queue_name = "pool_queue"
async def consume() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
queue_name, durable=False, auto_delete=False,
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message)
await message.ack()
async def publish() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(("Channel: %r" % channel).encode()),
queue_name,
)
async with connection_pool, channel_pool:
task = asyncio.create_task(consume())
await asyncio.wait([asyncio.create_task(publish()) for _ in range(50)])
await task
if __name__ == "__main__":
asyncio.run(main())