Patterns and helpers¶
Note
Available since aio-pika>=1.7.0
aio-pika includes some useful patterns for creating distributed systems.
Master/Worker¶
Helper which implements Master/Worker pattern. This applicable for balancing tasks between multiple workers.
The master creates tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master
async def main() -> None:
connection = await connect_robust(
"amqp://guest:[email protected]/?name=aio-pika%20master",
)
async with connection:
# Creating channel
channel = await connection.channel()
master = Master(channel)
# Creates tasks by proxy object
for task_id in range(1000):
await master.proxy.my_task_name(task_id=task_id)
# Or using create_task method
for task_id in range(1000):
await master.create_task(
"my_task_name", kwargs=dict(task_id=task_id),
)
if __name__ == "__main__":
asyncio.run(main())
Worker code:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master, NackMessage, RejectMessage
async def worker(*, task_id: int) -> None:
# If you want to reject message or send
# nack you might raise special exception
if task_id % 2 == 0:
raise RejectMessage(requeue=False)
if task_id % 2 == 1:
raise NackMessage(requeue=False)
print(task_id)
async def main() -> None:
connection = await connect_robust(
"amqp://guest:[email protected]/?name=aio-pika%20worker",
)
# Creating channel
channel = await connection.channel()
# Initializing Master with channel
master = Master(channel)
await master.create_worker("my_task_name", worker, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
The one or multiple workers executes tasks.
RPC¶
Helper which implements Remote Procedure Call pattern. This applicable for balancing tasks between multiple workers.
The caller creates tasks and awaiting results:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def main() -> None:
connection = await connect_robust(
"amqp://guest:[email protected]/",
client_properties={"connection_name": "caller"},
)
async with connection:
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
# Creates tasks by proxy object
for i in range(1000):
print(await rpc.proxy.multiply(x=100, y=i))
# Or using create_task method
for i in range(1000):
print(await rpc.call("multiply", kwargs=dict(x=100, y=i)))
if __name__ == "__main__":
asyncio.run(main())
One or multiple callees executing tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def multiply(*, x: int, y: int) -> int:
return x * y
async def main() -> None:
connection = await connect_robust(
"amqp://guest:[email protected]/",
client_properties={"connection_name": "callee"},
)
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
await rpc.register("multiply", multiply, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Extending¶
Both patterns serialization behaviour might be changed by inheritance and
redefinition of methods aio_pika.patterns.base.serialize()
and aio_pika.patterns.base.deserialize()
.
Following examples demonstrates it:
from typing import Any
import msgpack # type: ignore
from aio_pika.patterns import RPC, Master
class MsgpackRPC(RPC):
CONTENT_TYPE = "application/msgpack"
def serialize(self, data: Any) -> bytes:
return msgpack.dumps(data)
def deserialize(self, data: bytes) -> bytes:
return msgpack.loads(data)
class MsgpackMaster(Master):
CONTENT_TYPE = "application/msgpack"
def serialize(self, data: Any) -> bytes:
return msgpack.dumps(data)
def deserialize(self, data: bytes) -> bytes:
return msgpack.loads(data)