API === .. module:: aioamqp :synopsis: public Jinja2 API Basics ------ There are two principal objects when using aioamqp: * The protocol object, used to begin a connection to aioamqp, * The channel object, used when creating a new channel to effectively use an AMQP channel. Starting a connection --------------------- Starting a connection to AMQP really mean instanciate a new asyncio Protocol subclass:: import asyncio import aioamqp @asyncio.coroutine def connect(): try: transport, protocol = yield from aioamqp.connect() # use default parameters except aioamqp.AmqpClosedConnection: print("closed connections") return print("connected !") yield from asyncio.sleep(1) print("close connection") yield from protocol.close() transport.close() asyncio.get_event_loop().run_until_complete(connect()) In this example, we just use the method "start_connection" to begin a communication with the server, which deals with credentials and connection tunning. Handling errors --------------- The connect() method has an extra 'on_error' kwarg option. This on_error is a callback or a coroutine function which is called with an exception as the argument:: import asyncio import aioamqp @asyncio.coroutine def error_callback(exception): print(exception) @asyncio.coroutine def connect(): try: transport, protocol = yield from aioamqp.connect( host='nonexistant.com', on_error=error_callback, ) except aioamqp.AmqpClosedConnection: print("closed connections") return asyncio.get_event_loop().run_until_complete(connect()) Publishing messages ------------------- A channel is the main object when you want to send message to an exchange, or to consume message from a queue:: channel = yield from protocol.channel() When you want to produce some content, you declare a queue then publish message into it:: queue = yield from channel.queue_declare("my_queue") yield from queue.publish("aioamqp hello", '', "my_queue") Note: we're pushing message to "my_queue" queue, through the default amqp exchange. Consuming messages ------------------ When consuming message, you connect to the same queue you previously created:: import asyncio import aioamqp @asyncio.coroutine def callback(body, envelope, properties): print(body) channel = yield from protocol.channel() yield from channel.basic_consume("my_queue", callback=callback) The ``basic_consume`` method tells the server to send us the messages, and will call ``callback`` with amqp response arguments. The ``consumer_tag`` is the id of your consumer, and the ``delivery_tag`` is the tag used if you want to acknowledge the message. In the callback: * the first ``body`` parameter is the message * the ``envelope`` is an instance of envelope.Envelope class which encapsulate a group of amqp parameter such as:: consumer_tag delivery_tag exchange_name routing_key is_redeliver * the ``properties`` are message properties, an instance of properties.Properties with the following members:: content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration message_id timestamp type user_id app_id cluster_id Using exchanges --------------- You can bind an exchange to a queue:: channel = yield from protocol.channel() exchange = yield from channel.exchange_declare(exchange_name="my_exchange", type_name='fanout') yield from channel.queue_declare("my_queue") yield from channel.queue_bind("my_queue", "my_exchange")