Work Queues : Distributing tasks among workers

The main purpose of this part of the tutorial is to ack a message in RabbitMQ only when it’s really processed by a worker.

new_task

This publisher creates a queue with the durable flag and publish a message with the property persistent.

await channel.queue('task_queue', durable=True)

await channel.basic_publish(
    payload=message,
    exchange_name='',
    routing_key='task_queue',
    properties={
        'delivery_mode': 2,
    },
)

worker

The purpose of this worker is to simulate a resource consuming execution which delays the processing of the other messages.

The worker declares the queue with the exact same argument of the new_task producer.

await channel.queue('task_queue', durable=True)

Then, the worker configure the QOS: it specifies how the worker unqueues message.

await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)

Finaly we have to create a callback that will ack the message to mark it as processed. Note: the code in the callback calls asyncio.sleep to simulate an asyncio compatible task that takes time. You probably want to block the eventloop to simulate a CPU intensive task using time.sleep.

async def callback(channel, body, envelope, properties):
    print(" [x] Received %r" % body)
    await asyncio.sleep(body.count(b'.'))
    print(" [x] Done")
    await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)