Quick start¶
Some useful examples.
Simple consumer¶
Connect to RabbitMQ, declare a queue, and consume messages using an async
iterator. Each message is automatically acknowledged when the process()
context manager exits successfully.
import asyncio
import logging
import aio_pika
async def main() -> None:
logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos(prefetch_count=10)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
asyncio.run(main())
Simple publisher¶
Connect to RabbitMQ and publish a single message to a queue through the default exchange.
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body=f"Hello {routing_key}".encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
asyncio.run(main())
Asynchronous message processing¶
Consume messages using a callback function instead of an async iterator.
This allows multiple messages to be processed concurrently, controlled by
the prefetch_count setting.
import asyncio
import aio_pika
async def process_message(
message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
async with message.process():
print(message.body)
await asyncio.sleep(1)
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
queue_name = "test_queue"
# Creating channel
channel = await connection.channel()
# Maximum message count which will be processing at the same time.
await channel.set_qos(prefetch_count=100)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
await queue.consume(process_message)
try:
# Wait until terminate
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Working with RabbitMQ transactions¶
Publish messages atomically using AMQP transactions. Messages are only
delivered to the queue after the transaction is committed. Shows both
the context manager approach and manual select/commit/rollback usage.
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
async with connection:
routing_key = "test_queue"
# Transactions conflicts with `publisher_confirms`
channel = await connection.channel(publisher_confirms=False)
# Use transactions with async context manager
async with channel.transaction():
# Publishing messages but delivery will not be done
# before committing this transaction
for i in range(10):
message = aio_pika.Message(body="Hello #{}".format(i).encode())
await channel.default_exchange.publish(
message,
routing_key=routing_key,
)
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
await tx.commit()
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Should be rejected".encode()),
routing_key=routing_key,
)
await tx.rollback()
if __name__ == "__main__":
asyncio.run(main())
Get single message example¶
Fetch a single message from a queue using queue.get() instead of
continuous consumption. This is useful for polling or one-off retrieval,
with manual acknowledgement of the received message.
import asyncio
from typing import Optional
from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
connection = await connect_robust(
"amqp://guest:[email protected]/?name=aio-pika%20example",
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange("direct", auto_delete=True)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes("Hello", "utf-8"),
content_type="text/plain",
headers={"foo": "bar"},
),
routing_key,
)
# Receiving one message
incoming_message: Optional[AbstractIncomingMessage] = await queue.get(
timeout=5, fail=False
)
if incoming_message:
# Confirm message
await incoming_message.ack()
else:
print("Queue empty")
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Set logging level¶
Sometimes you want to see only your debug logs, but when you just call
logging.basicConfig(logging.DEBUG) you set the debug log level for all
loggers, includes all aio_pika’s modules. If you want to set logging level
independently see following example:
import logging
from aio_pika import logger
logger.setLevel(logging.ERROR)
External credentials example¶
Connect to RabbitMQ using TLS client certificates (x509) for authentication instead of username/password. Requires CA certificate, client certificate, and private key files.
import asyncio
import ssl
import aio_pika
from aio_pika.abc import SSLOptions
async def main() -> None:
connection = await aio_pika.connect_robust(
host="127.0.0.1",
login="",
ssl=True,
ssl_options=SSLOptions(
cafile="cacert.pem",
certfile="cert.pem",
keyfile="key.pem",
no_verify_ssl=ssl.CERT_REQUIRED,
),
client_properties={"connection_name": "aio-pika external credentials"},
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
asyncio.run(main())
Connection pooling¶
A single AMQP connection multiplexes multiple channels over one TCP socket, which is sufficient for most applications. However, a single connection has a finite throughput limited by its TCP link and the serialization of frames. Connection pooling may help when:
You are publishing or consuming a very high volume of messages and a single TCP connection becomes saturated.
You want to isolate groups of channels so that a blocked or slow connection does not affect other workloads.
Your application runs many concurrent tasks that would contend for the same connection’s write lock.
In most cases you do not need connection pooling — start with a
single connect_robust() connection and only add pooling after
profiling shows that the connection is a bottleneck.
Use aio_pika.pool.Pool to manage a pool of connections and channels.
import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool
async def main() -> None:
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
connection_pool: Pool = Pool(get_connection, max_size=2)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool: Pool = Pool(get_channel, max_size=10)
queue_name = "pool_queue"
async def consume() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
queue_name,
durable=False,
auto_delete=False,
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message)
await message.ack()
async def publish() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(("Channel: %r" % channel).encode()),
queue_name,
)
async with connection_pool, channel_pool:
task = asyncio.create_task(consume())
await asyncio.wait([asyncio.create_task(publish()) for _ in range(50)])
await task
if __name__ == "__main__":
asyncio.run(main())
FastAPI example¶
Integrate aio-pika with a FastAPI application. The connection is
established during startup via the lifespan context manager and stored
in app.state for use across request handlers.
Note
A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import aio_pika
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
app.state.connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
)
app.state.channel = await app.state.connection.channel()
try:
yield
finally:
await app.state.connection.close()
app = FastAPI(lifespan=lifespan)
@app.post("/publish")
async def publish_message(message: str) -> dict:
await app.state.channel.default_exchange.publish(
aio_pika.Message(body=message.encode()),
routing_key="test_queue",
)
return {"status": "ok"}
@app.get("/consume")
async def consume_message() -> dict:
queue = await app.state.channel.declare_queue(
"test_queue",
auto_delete=True,
)
message = await queue.get(timeout=5, fail=False)
if message:
await message.ack()
return {"body": message.body.decode()}
return {"body": None}
Starlette example¶
Integrate aio-pika with a Starlette application. The connection is
managed through startup/shutdown event handlers and stored in
app.state for use across request handlers.
Note
A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.
import aio_pika
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
async def publish_message(request: Request) -> JSONResponse:
channel = request.app.state.channel
body = await request.body()
await channel.default_exchange.publish(
aio_pika.Message(body=body),
routing_key="test_queue",
)
return JSONResponse({"status": "ok"})
async def consume_message(request: Request) -> JSONResponse:
channel = request.app.state.channel
queue = await channel.declare_queue("test_queue", auto_delete=True)
message = await queue.get(timeout=5, fail=False)
if message:
await message.ack()
return JSONResponse({"body": message.body.decode()})
return JSONResponse({"body": None})
async def on_startup() -> None:
app.state.connection = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
)
app.state.channel = await app.state.connection.channel()
async def on_shutdown() -> None:
await app.state.connection.close()
app = Starlette(
routes=[
Route("/publish", publish_message, methods=["POST"]),
Route("/consume", consume_message),
],
on_startup=[on_startup],
on_shutdown=[on_shutdown],
)
aiohttp example¶
Integrate aio-pika with an aiohttp web application. The connection is
created on application startup and stored in the app dict for access
from request handlers.
Note
A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.
import aio_pika
from aiohttp import web
async def publish_message(request: web.Request) -> web.Response:
channel = request.app["channel"]
body = await request.read()
await channel.default_exchange.publish(
aio_pika.Message(body=body),
routing_key="test_queue",
)
return web.json_response({"status": "ok"})
async def consume_message(request: web.Request) -> web.Response:
channel = request.app["channel"]
queue = await channel.declare_queue("test_queue", auto_delete=True)
message = await queue.get(timeout=5, fail=False)
if message:
await message.ack()
return web.json_response({"body": message.body.decode()})
return web.json_response({"body": None})
async def on_startup(app: web.Application) -> None:
app["connection"] = await aio_pika.connect_robust(
"amqp://guest:guest@localhost/",
)
app["channel"] = await app["connection"].channel()
async def on_shutdown(app: web.Application) -> None:
await app["connection"].close()
app = web.Application()
app.router.add_post("/publish", publish_message)
app.router.add_get("/consume", consume_message)
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)
if __name__ == "__main__":
web.run_app(app)
Tornado example¶
Integrate aio-pika with a Tornado web application. The publisher handler sends messages on POST requests, while the subscriber handler waits for incoming messages and returns them as HTTP responses.
Note
A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import Message, connect_robust
class Base:
QUEUE: asyncio.Queue
class SubscriberHandler(tornado.web.RequestHandler, Base):
async def get(self) -> None:
message = await self.QUEUE.get()
await self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self) -> None:
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body),
routing_key="test",
)
finally:
await channel.close()
await self.finish("OK")
async def make_app() -> tornado.web.Application:
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
Base.QUEUE = asyncio.Queue()
await queue.consume(Base.QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
async def main() -> None:
app = await make_app()
app.listen(8888)
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())