Introduction

Warning

This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.

It is expected that you are familiar with the basics of asyncio. Anyway following examples work as written. You feel free to download them and test it as is without any changes (in case your RabbitMQ installation allows access for user “guest”).

Otherwise we recommend to read asyncio tutorial.

Note

Prerequisites

This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.

Where to get help

If you’re having trouble going through this tutorial you can contact us through the mailing list.

RabbitMQ is a message broker. The principal idea is pretty simple: it accepts and forwards messages. You can think about it as a post office: when you send mail to the post box you’re pretty sure that Mr. Postman will eventually deliver the mail to your recipient. Using this metaphor RabbitMQ is a post box, a post office and a postman.

The major difference between RabbitMQ and the post office is the fact that it doesn’t deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.

RabbitMQ, and messaging in general, uses some jargon.

  • Producing means nothing more than sending. A program that sends messages is a producer.

We’ll draw it like that, with “P”:

../_images/producer.svg
  • A queue is the name for a mailbox. It lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can be stored only inside a queue. A queue is not bound by any limits, it can store as many messages as you like ‒ it’s essentially an infinite buffer. Many producers can send messages that go to one queue, many consumers can try to receive data from one queue.

A queue will be drawn as like that, with its name above it:

../_images/queue.svg
  • Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages.

On our drawings it’s shown with “C”:

../_images/consumer.svg

Note

Note that the producer, consumer, and broker do not have to reside on the same machine; indeed in most applications they don’t.

Hello World!

Note

Using the aio-pika async Python client

Our “Hello world” won’t be too complex ‒ let’s send a message, receive it and print it on the screen. To do so we need two programs: one that sends a message and one that receives and prints it.

Our overall design will look like:

../_images/python-one-overall.svg

Producer sends messages to the “hello” queue. The consumer receives messages from that queue.

Note

RabbitMQ libraries

RabbitMQ speaks AMQP 0.9.1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we’re going to use aio-pika. To install it you can use the pip package management tool.

Sending

../_images/sending.svg

Our first program send.py will send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()

We’re connected now, to a broker on the local machine - hence the localhost. If we wanted to connect to a broker on a different machine we’d simply specify its name or IP address here.

Next, before sending we need to make sure the recipient queue exists. If we send a message to non-existing location, RabbitMQ will just trash the message. Let’s create a queue to which the message will be delivered, let’s name it hello:

        # Declaring queue
        queue = await channel.declare_queue("hello")

At that point we’re ready to send a message. Our first message will just contain a string Hello World! and we want to send it to our hello queue.

In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. But let’s not get dragged down by the details ‒ you can read more about exchanges in the third part of this tutorial. All we need to know now is how to use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:

        # Sending the message
        await channel.default_exchange.publish(
            Message(b"Hello World!"),
            routing_key=queue.name,
        )

Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection. In this example async context manager has been used.

    async with connection:
        # Creating a channel
        channel = await connection.channel()

Note

Sending doesn’t work!

If this is your first time using RabbitMQ and you don’t see the “Sent” message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 1Gb free) and is therefore refusing to accept messages. Check the broker logfile to confirm and reduce the limit if necessary. The configuration file documentation will show you how to set disk_free_limit.

Receiving

../_images/receiving.svg

Our second program receive.py will receive messages from the queue and print them on the screen.

Again, first we need to connect to RabbitMQ server. The code responsible for connecting to Rabbit is the same as previously.

The next step, just like before, is to make sure that the queue exists. Creating a queue using queue_declare is idempotent ‒ we can run the command as many times as we like, and only one will be created.

    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("hello")

You may ask why we declare the queue again ‒ we have already declared it in our previous code. We could avoid that if we were sure that the queue already exists. For example if send.py program was run before. But we’re not yet sure which program to run first. In such cases it’s a good practice to repeat declaring the queue in both programs.

Note

Listing queues

You may wish to see what queues RabbitMQ has and how many messages are in them. You can do it (as a privileged user) using the rabbitmqctl tool:

$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.
(omit sudo on Windows)

Receiving messages from the queue is simple. It works by subscribing a callback function to a queue or using simple get.

Whenever we receive a message, this callback function is called by the aio-pika library. In our case this function will print on the screen the contents of the message.

async def on_message(message: AbstractIncomingMessage) -> None:
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(" [x] Received message %r" % message)
    print("Message body is: %r" % message.body)

    print("Before sleep!")
    await asyncio.sleep(5)  # Represents async I/O operations
    print("After sleep!")

Next, we need to tell RabbitMQ that this particular callback function should receive messages from our hello queue:

async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("hello")

        # Start listening the queue with name 'hello'
        await queue.consume(on_message, no_ack=True)

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()

The no_ack parameter will be described later on.

Putting it all together

Full code for send.py:

import asyncio

from aio_pika import Message, connect


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("hello")

        # Sending the message
        await channel.default_exchange.publish(
            Message(b"Hello World!"),
            routing_key=queue.name,
        )

        print(" [x] Sent 'Hello World!'")


if __name__ == "__main__":
    asyncio.run(main())

Full receive.py code:

import asyncio

from aio_pika import connect
from aio_pika.abc import AbstractIncomingMessage


async def on_message(message: AbstractIncomingMessage) -> None:
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(" [x] Received message %r" % message)
    print("Message body is: %r" % message.body)

    print("Before sleep!")
    await asyncio.sleep(5)  # Represents async I/O operations
    print("After sleep!")


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue("hello")

        # Start listening the queue with name 'hello'
        await queue.consume(on_message, no_ack=True)

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

Now we can try out our programs in a terminal. First, let’s send a message using our send.py program:

$ python send.py
[x] Sent 'Hello World!'

The producer program send.py will stop after every run. Let’s receive it:

$ python receive.py
[x] Received message IncomingMessage:{
 "app_id": null,
 "body_size": 12,
 "cluster_id": null,
 "consumer_tag": "ctag1.11fa33f5f4fa41f6a6488648181656e0",
 "content_encoding": null,
 "content_type": null,
 "correlation_id": "b'None'",
 "delivery_mode": 1,
 "delivery_tag": 1,
 "exchange": "",
 "expiration": null,
 "headers": null,
 "message_id": null,
 "priority": null,
 "redelivered": false,
 "reply_to": null,
 "routing_key": "hello",
 "synchronous": false,
 "timestamp": null,
 "type": "None",
 "user_id": null
}
Message body is: b'Hello World!'

Hurray! We were able to send our first message through RabbitMQ. As you might have noticed, the receive.py program doesn’t exit. It will stay ready to receive further messages, and may be interrupted with Ctrl-C.

Try to run send.py again in a new terminal.

We’ve learned how to send and receive a message from a named queue. It’s time to move on to part 2 and build a simple work queue.

Note

This material was adopted from official tutorial on rabbitmq.org.