Quick start

Some useful examples.

Simple consumer

Connect to RabbitMQ, declare a queue, and consume messages using an async iterator. Each message is automatically acknowledged when the process() context manager exits successfully.

import asyncio
import logging

import aio_pika


async def main() -> None:
    logging.basicConfig(level=logging.DEBUG)
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/",
    )

    queue_name = "test_queue"

    async with connection:
        # Creating channel
        channel = await connection.channel()

        # Will take no more than 10 messages in advance
        await channel.set_qos(prefetch_count=10)

        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=True)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

                    if queue.name in message.body.decode():
                        break


if __name__ == "__main__":
    asyncio.run(main())

Simple publisher

Connect to RabbitMQ and publish a single message to a queue through the default exchange.

import asyncio

import aio_pika


async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/",
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(body=f"Hello {routing_key}".encode()),
            routing_key=routing_key,
        )


if __name__ == "__main__":
    asyncio.run(main())

Asynchronous message processing

Consume messages using a callback function instead of an async iterator. This allows multiple messages to be processed concurrently, controlled by the prefetch_count setting.

import asyncio

import aio_pika


async def process_message(
    message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
    async with message.process():
        print(message.body)
        await asyncio.sleep(1)


async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/",
    )

    queue_name = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Maximum message count which will be processing at the same time.
    await channel.set_qos(prefetch_count=100)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    await queue.consume(process_message)

    try:
        # Wait until terminate
        await asyncio.Future()
    finally:
        await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

Working with RabbitMQ transactions

Publish messages atomically using AMQP transactions. Messages are only delivered to the queue after the transaction is committed. Shows both the context manager approach and manual select/commit/rollback usage.

import asyncio

import aio_pika


async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/",
    )

    async with connection:
        routing_key = "test_queue"

        # Transactions conflicts with `publisher_confirms`
        channel = await connection.channel(publisher_confirms=False)

        # Use transactions with async context manager
        async with channel.transaction():
            # Publishing messages but delivery will not be done
            # before committing this transaction
            for i in range(10):
                message = aio_pika.Message(body="Hello #{}".format(i).encode())

                await channel.default_exchange.publish(
                    message,
                    routing_key=routing_key,
                )

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )

        await tx.commit()

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Should be rejected".encode()),
            routing_key=routing_key,
        )

        await tx.rollback()


if __name__ == "__main__":
    asyncio.run(main())

Get single message example

Fetch a single message from a queue using queue.get() instead of continuous consumption. This is useful for polling or one-off retrieval, with manual acknowledgement of the received message.

import asyncio
from typing import Optional

from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractIncomingMessage


async def main() -> None:
    connection = await connect_robust(
        "amqp://guest:[email protected]/?name=aio-pika%20example",
    )

    queue_name = "test_queue"
    routing_key = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange("direct", auto_delete=True)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    # Binding queue
    await queue.bind(exchange, routing_key)

    await exchange.publish(
        Message(
            bytes("Hello", "utf-8"),
            content_type="text/plain",
            headers={"foo": "bar"},
        ),
        routing_key,
    )

    # Receiving one message
    incoming_message: Optional[AbstractIncomingMessage] = await queue.get(
        timeout=5, fail=False
    )
    if incoming_message:
        # Confirm message
        await incoming_message.ack()
    else:
        print("Queue empty")

    await queue.unbind(exchange, routing_key)
    await queue.delete()
    await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

Set logging level

Sometimes you want to see only your debug logs, but when you just call logging.basicConfig(logging.DEBUG) you set the debug log level for all loggers, includes all aio_pika’s modules. If you want to set logging level independently see following example:

import logging

from aio_pika import logger


logger.setLevel(logging.ERROR)

External credentials example

Connect to RabbitMQ using TLS client certificates (x509) for authentication instead of username/password. Requires CA certificate, client certificate, and private key files.

import asyncio
import ssl

import aio_pika
from aio_pika.abc import SSLOptions


async def main() -> None:
    connection = await aio_pika.connect_robust(
        host="127.0.0.1",
        login="",
        ssl=True,
        ssl_options=SSLOptions(
            cafile="cacert.pem",
            certfile="cert.pem",
            keyfile="key.pem",
            no_verify_ssl=ssl.CERT_REQUIRED,
        ),
        client_properties={"connection_name": "aio-pika external credentials"},
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )


if __name__ == "__main__":
    asyncio.run(main())

Connection pooling

A single AMQP connection multiplexes multiple channels over one TCP socket, which is sufficient for most applications. However, a single connection has a finite throughput limited by its TCP link and the serialization of frames. Connection pooling may help when:

  • You are publishing or consuming a very high volume of messages and a single TCP connection becomes saturated.

  • You want to isolate groups of channels so that a blocked or slow connection does not affect other workloads.

  • Your application runs many concurrent tasks that would contend for the same connection’s write lock.

In most cases you do not need connection pooling — start with a single connect_robust() connection and only add pooling after profiling shows that the connection is a bottleneck.

Use aio_pika.pool.Pool to manage a pool of connections and channels.

import asyncio

import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool


async def main() -> None:
    async def get_connection() -> AbstractRobustConnection:
        return await aio_pika.connect_robust("amqp://guest:guest@localhost/")

    connection_pool: Pool = Pool(get_connection, max_size=2)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool: Pool = Pool(get_channel, max_size=10)
    queue_name = "pool_queue"

    async def consume() -> None:
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.set_qos(10)

            queue = await channel.declare_queue(
                queue_name,
                durable=False,
                auto_delete=False,
            )

            async with queue.iterator() as queue_iter:
                async for message in queue_iter:
                    print(message)
                    await message.ack()

    async def publish() -> None:
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.default_exchange.publish(
                aio_pika.Message(("Channel: %r" % channel).encode()),
                queue_name,
            )

    async with connection_pool, channel_pool:
        task = asyncio.create_task(consume())
        await asyncio.wait([asyncio.create_task(publish()) for _ in range(50)])
        await task


if __name__ == "__main__":
    asyncio.run(main())

FastAPI example

Integrate aio-pika with a FastAPI application. The connection is established during startup via the lifespan context manager and stored in app.state for use across request handlers.

Note

A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.

from contextlib import asynccontextmanager
from typing import AsyncGenerator

import aio_pika
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    app.state.connection = await aio_pika.connect_robust(
        "amqp://guest:guest@localhost/",
    )
    app.state.channel = await app.state.connection.channel()
    try:
        yield
    finally:
        await app.state.connection.close()


app = FastAPI(lifespan=lifespan)


@app.post("/publish")
async def publish_message(message: str) -> dict:
    await app.state.channel.default_exchange.publish(
        aio_pika.Message(body=message.encode()),
        routing_key="test_queue",
    )
    return {"status": "ok"}


@app.get("/consume")
async def consume_message() -> dict:
    queue = await app.state.channel.declare_queue(
        "test_queue",
        auto_delete=True,
    )
    message = await queue.get(timeout=5, fail=False)

    if message:
        await message.ack()
        return {"body": message.body.decode()}

    return {"body": None}

Starlette example

Integrate aio-pika with a Starlette application. The connection is managed through startup/shutdown event handlers and stored in app.state for use across request handlers.

Note

A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.

import aio_pika
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route


async def publish_message(request: Request) -> JSONResponse:
    channel = request.app.state.channel
    body = await request.body()
    await channel.default_exchange.publish(
        aio_pika.Message(body=body),
        routing_key="test_queue",
    )
    return JSONResponse({"status": "ok"})


async def consume_message(request: Request) -> JSONResponse:
    channel = request.app.state.channel
    queue = await channel.declare_queue("test_queue", auto_delete=True)
    message = await queue.get(timeout=5, fail=False)

    if message:
        await message.ack()
        return JSONResponse({"body": message.body.decode()})

    return JSONResponse({"body": None})


async def on_startup() -> None:
    app.state.connection = await aio_pika.connect_robust(
        "amqp://guest:guest@localhost/",
    )
    app.state.channel = await app.state.connection.channel()


async def on_shutdown() -> None:
    await app.state.connection.close()


app = Starlette(
    routes=[
        Route("/publish", publish_message, methods=["POST"]),
        Route("/consume", consume_message),
    ],
    on_startup=[on_startup],
    on_shutdown=[on_shutdown],
)

aiohttp example

Integrate aio-pika with an aiohttp web application. The connection is created on application startup and stored in the app dict for access from request handlers.

Note

A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.

import aio_pika
from aiohttp import web


async def publish_message(request: web.Request) -> web.Response:
    channel = request.app["channel"]
    body = await request.read()
    await channel.default_exchange.publish(
        aio_pika.Message(body=body),
        routing_key="test_queue",
    )
    return web.json_response({"status": "ok"})


async def consume_message(request: web.Request) -> web.Response:
    channel = request.app["channel"]
    queue = await channel.declare_queue("test_queue", auto_delete=True)
    message = await queue.get(timeout=5, fail=False)

    if message:
        await message.ack()
        return web.json_response({"body": message.body.decode()})

    return web.json_response({"body": None})


async def on_startup(app: web.Application) -> None:
    app["connection"] = await aio_pika.connect_robust(
        "amqp://guest:guest@localhost/",
    )
    app["channel"] = await app["connection"].channel()


async def on_shutdown(app: web.Application) -> None:
    await app["connection"].close()


app = web.Application()
app.router.add_post("/publish", publish_message)
app.router.add_get("/consume", consume_message)
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)

if __name__ == "__main__":
    web.run_app(app)

Tornado example

Integrate aio-pika with a Tornado web application. The publisher handler sends messages on POST requests, while the subscriber handler waits for incoming messages and returns them as HTTP responses.

Note

A single robust connection is sufficient for most use cases. AMQP multiplexes work over channels within one connection. Consider connection pooling only if you have measured that a single connection is a bottleneck.

import asyncio

import tornado.ioloop
import tornado.web

from aio_pika import Message, connect_robust


class Base:
    QUEUE: asyncio.Queue


class SubscriberHandler(tornado.web.RequestHandler, Base):
    async def get(self) -> None:
        message = await self.QUEUE.get()
        await self.finish(message.body)


class PublisherHandler(tornado.web.RequestHandler):
    async def post(self) -> None:
        connection = self.application.settings["amqp_connection"]
        channel = await connection.channel()

        try:
            await channel.default_exchange.publish(
                Message(body=self.request.body),
                routing_key="test",
            )
        finally:
            await channel.close()

        await self.finish("OK")


async def make_app() -> tornado.web.Application:
    amqp_connection = await connect_robust()

    channel = await amqp_connection.channel()
    queue = await channel.declare_queue("test", auto_delete=True)
    Base.QUEUE = asyncio.Queue()

    await queue.consume(Base.QUEUE.put, no_ack=True)

    return tornado.web.Application(
        [(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
        amqp_connection=amqp_connection,
    )


async def main() -> None:
    app = await make_app()
    app.listen(8888)
    await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())