API

Basics

There are two principal objects when using aioamqp:

  • The protocol object, used to begin a connection to aioamqp,
  • The channel object, used when creating a new channel to effectively use an AMQP channel.

Starting a connection

Starting a connection to AMQP really mean instanciate a new asyncio Protocol subclass.

aioamqp.connect(host, port, login, password, virtualhost, ssl, login_method, insist, protocol_factory, verify_ssl, loop, kwargs) → Transport, AmqpProtocol

Convenient method to connect to an AMQP broker

Parameters:
  • host (str) – the host to connect to
  • port (int) – broker port
  • login (str) – login
  • password (str) – password
  • virtualhost (str) – AMQP virtualhost to use for this connection
  • ssl (bool) – create an SSL connection instead of a plain unencrypted one
  • verify_ssl (bool) – verify server’s SSL certificate (True by default)
  • login_method (str) – AMQP auth method
  • insist (bool) – insist on connecting to a server
  • protocol_factory (AmqpProtocol) – factory to use, if you need to subclass AmqpProtocol
  • loop (EventLopp) – set the event loop to use
  • kwargs (dict) – arguments to be given to the protocol_factory instance
import asyncio
import aioamqp

@asyncio.coroutine
def connect():
    try:
        transport, protocol = yield from aioamqp.connect()  # use default parameters
    except aioamqp.AmqpClosedConnection:
        print("closed connections")
        return

    print("connected !")
    yield from asyncio.sleep(1)

    print("close connection")
    yield from protocol.close()
    transport.close()

asyncio.get_event_loop().run_until_complete(connect())

In this example, we just use the method “start_connection” to begin a communication with the server, which deals with credentials and connection tunning.

If you’re not using the default event loop (e.g. because you’re using aioamqp from a different thread), call aioamqp.connect(loop=your_loop).

The AmqpProtocol uses the kwargs arguments to configure the connection to the AMQP Broker:

AmqpProtocol.__init__(self, *args, **kwargs):

The protocol to communicate with AMQP

Parameters:
  • channel_max (int) – specifies highest channel number that the server permits. Usable channel numbers are in the range 1..channel-max. Zero indicates no specified limit.
  • frame_max (int) – the largest frame size that the server proposes for the connection, including frame header and end-byte. The client can negotiate a lower value. Zero means that the server does not impose any specific limit but may reject very large frames if it cannot allocate resources for them.
  • heartbeat (int) – the delay, in seconds, of the connection heartbeat that the server wants. Zero means the server does not want a heartbeat.
  • loop (Asyncio.EventLoop) – specify the eventloop to use.
  • client_properties (dict) – configure the client to connect to the AMQP server.

Handling errors

The connect() method has an extra ‘on_error’ kwarg option. This on_error is a callback or a coroutine function which is called with an exception as the argument:

import asyncio
import socket
import aioamqp

@asyncio.coroutine
def error_callback(exception):
    print(exception)

@asyncio.coroutine
def connect():
    try:
        transport, protocol = yield from aioamqp.connect(
            host='nonexistant.com',
            on_error=error_callback,
            client_properties={
                'program_name': "test",
                'hostname' : socket.gethostname(),
            },

        )
    except aioamqp.AmqpClosedConnection:
        print("closed connections")
        return

asyncio.get_event_loop().run_until_complete(connect())

Publishing messages

A channel is the main object when you want to send message to an exchange, or to consume message from a queue:

channel = yield from protocol.channel()

When you want to produce some content, you declare a queue then publish message into it:

yield from channel.queue_declare("my_queue")
yield from channel.publish("aioamqp hello", '', "my_queue")

Note: we’re pushing message to “my_queue” queue, through the default amqp exchange.

Consuming messages

When consuming message, you connect to the same queue you previously created:

import asyncio
import aioamqp

@asyncio.coroutine
def callback(body, envelope, properties):
    print(body)

channel = yield from protocol.channel()
yield from channel.basic_consume(callback, queue_name="my_queue")

The basic_consume method tells the server to send us the messages, and will call callback with amqp response arguments.

The consumer_tag is the id of your consumer, and the delivery_tag is the tag used if you want to acknowledge the message.

In the callback:

  • the first body parameter is the message

  • the envelope is an instance of envelope.Envelope class which encapsulate a group of amqp parameter such as:

    consumer_tag
    delivery_tag
    exchange_name
    routing_key
    is_redeliver
    
  • the properties are message properties, an instance of properties.Properties with the following members:

    content_type
    content_encoding
    headers
    delivery_mode
    priority
    correlation_id
    reply_to
    expiration
    message_id
    timestamp
    type
    user_id
    app_id
    cluster_id
    

Queues

Queues are managed from the Channel object.

Channel.queue_declare(queue_name, passive, durable, exclusive, auto_delete, no_wait, arguments, timeout) → dict

Coroutine, creates or checks a queue on the broker

Parameters:
  • queue_name (str) – the queue to receive message from
  • passive (bool) – if set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. Checks for the same parameter as well.
  • durable (bool) – if set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts.
  • exclusive (bool) – request exclusive consumer access, meaning only this consumer can access the queue
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the queue.
  • timeout (int) – wait for the server to respond after timeout

Here is an example to create a randomly named queue with special arguments x-max-priority:

result = yield from channel.queue_declare(
    queue_name='', durable=True, arguments={'x-max-priority': 4}
)
Channel.queue_delete(queue_name, if_unused, if_empty, no_wait, timeout)

Coroutine, delete a queue on the broker

Parameters:
  • queue_name (str) – the queue to receive message from
  • if_unused (bool) – the queue is deleted if it has no consumers. Raise if not.
  • if_empty (bool) – the queue is deleted if it has no messages. Raise if not.
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the queue.
  • timeout (int) – wait for the server to respond after timeout
Channel.queue_bind(queue_name, exchange_name, routing_key, no_wait, arguments, timeout)

Coroutine, bind a queue to an exchange

Parameters:
  • queue_name (str) – the queue to receive message from.
  • exchange_name (str) – the exchange to bind the queue to.
  • routing_key (str) – the routing_key to route message.
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the queue.
  • timeout (int) – wait for the server to respond after timeout

This simple example creates a queue, an exchange and bind them together.

channel = yield from protocol.channel()
yield from channel.queue_declare(queue_name='queue')
yield from channel.exchange_declare(exchange_name='exchange')

yield from channel.queue_bind('queue', 'exchange', routing_key='')
Channel.queue_unbind(queue_name, exchange_name, routing_key, arguments, timeout)

Coroutine, unbind a queue and an exchange.

Parameters:
  • queue_name (str) – the queue to receive message from.
  • exchange_name (str) – the exchange to bind the queue to.
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the queue.
  • timeout (int) – wait for the server to respond after timeout
PARAM STR ROUTING_KEY:
 

THE ROUTING_KEY TO ROUTE MESSAGE.

Channel.queue_purge(queue_name, no_wait, timeout)

Coroutine, purge a queue

Parameters:queue_name (str) – the queue to receive message from.

Exchanges

Exchanges are used to correctly route message to queue: a publisher publishes a message into an exchanges, which routes the message to the corresponding queue.

Channel.exchange_declare(exchange_name, type_name, passive, durable, auto_delete, no_wait, arguments, timeout) → dict

Coroutine, creates or checks an exchange on the broker

Parameters:
  • exchange_name (str) – the exchange to receive message from
  • type_name (str) – the exchange type (fanout, direct, topics ...)
  • passive (bool) – if set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. Checks for the same parameter as well.
  • durable (bool) – if set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts.
  • auto_delete (bool) – if set, the exchange is deleted when all queues have finished using it.
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the exchange.
  • timeout (int) – wait for the server to respond after timeout

Note: the internal flag is deprecated and not used in this library.

channel = yield from protocol.channel()
yield from channel.exchange_declare(exchange_name='exchange', auto_delete=True)
Channel.exchange_delete(exchange_name, if_unused, no_wait, timeout)

Coroutine, delete a exchange on the broker

Parameters:
  • exchange_name (str) – the exchange to receive message from
  • if_unused (bool) – the exchange is deleted if it has no consumers. Raise if not.
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the exchange.
  • timeout (int) – wait for the server to respond after timeout
Channel.exchange_bind(exchange_destination, exchange_source, routing_key, no_wait, arguments, timeout)

Coroutine, binds two exchanges together

Parameters:
  • exchange_destination (str) – specifies the name of the destination exchange to bind
  • exchange_source (str) – specified the name of the source exchange to bind.
  • exchange_destination – specifies the name of the destination exchange to bind
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the exchange.
  • timeout (int) – wait for the server to respond after timeout
Channel.exchange_unbind(exchange_destination, exchange_source, routing_key, no_wait, arguments, timeout)
Coroutine, unbind an exchange from an exchange.
Parameters:
  • exchange_destination (str) – specifies the name of the destination exchange to bind
  • exchange_source (str) – specified the name of the source exchange to bind.
  • exchange_destination – specifies the name of the destination exchange to bind
  • no_wait (bool) – if set, the server will not respond to the method
  • arguments (dict) – AMQP arguments to be passed when creating the exchange.
  • timeout (int) – wait for the server to respond after timeout