Channel
- class amqpstorm.Channel(channel_id, connection, rpc_timeout)[source]
RabbitMQ Channel.
e.g.
channel = connection.channel()
- property basic
RabbitMQ Basic Operations.
e.g.
message = channel.basic.get(queue='hello_world')
- Return type
- property exchange
RabbitMQ Exchange Operations.
e.g.
channel.exchange.declare(exchange='hello_world')
- Return type
- property queue
RabbitMQ Queue Operations.
e.g.
channel.queue.declare(queue='hello_world')
- Return type
- property tx
RabbitMQ Tx Operations.
e.g.
channel.tx.commit()
- Return type
- build_inbound_messages(break_on_empty=False, to_tuple=False, auto_decode=True, message_impl=None)[source]
Build messages in the inbound queue.
- Parameters
break_on_empty (bool) –
Should we break the loop when there are no more messages in our inbound queue.
This does not guarantee that the queue is emptied before the loop is broken, as messages may be consumed faster then they are being delivered by RabbitMQ, causing the loop to be broken prematurely.
to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.
auto_decode (bool) – Auto-decode strings when possible.
message_impl (class) – Optional message class to use, derived from BaseMessage, for created messages. Defaults to Message.
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
generator
- close(reply_code=200, reply_text='')[source]
Close Channel.
- Parameters
reply_code (int) – Close reply code (e.g. 200)
reply_text (str) – Close reply text
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- check_for_errors()[source]
Check connection and channel for errors.
- Raises
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- check_for_exceptions()[source]
Check channel for exceptions.
- Raises
AMQPChannelError – Raises if the channel encountered an error.
- Returns
- confirm_deliveries()[source]
Set the channel to confirm that each message has been successfully delivered.
- Raises
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- process_data_events(to_tuple=False, auto_decode=True)[source]
Consume inbound messages.
- Parameters
to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.
auto_decode (bool) – Auto-decode strings when possible.
- Raises
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- start_consuming(to_tuple=False, auto_decode=True)[source]
Start consuming messages.
- Parameters
to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.
auto_decode (bool) – Auto-decode strings when possible.
- Raises
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- stop_consuming()[source]
Stop consuming messages.
- Raises
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
Channel.Basic
- class amqpstorm.basic.Basic(channel, max_frame_size=None)[source]
RabbitMQ Basic Operations.
- qos(prefetch_count=0, prefetch_size=0, global_=False)[source]
Specify quality of service.
- Parameters
prefetch_count (int) – Prefetch window in messages
prefetch_size (int/long) – Prefetch window in octets
global (bool) – Apply to entire connection
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- get(queue='', no_ack=False, to_dict=False, auto_decode=True, message_impl=None)[source]
Fetch a single message.
- Parameters
queue (str) – Queue name
no_ack (bool) – No acknowledgement needed
to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.
auto_decode (bool) – Auto-decode strings when possible.
message_impl (class) – Message implementation based on BaseMessage
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
Returns a single message, as long as there is a message in the queue. If no message is available, returns None.
- Return type
amqpstorm.Message,dict,None
- recover(requeue=False)[source]
Redeliver unacknowledged messages.
- Parameters
requeue (bool) – Re-queue the messages
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- consume(callback=None, queue='', consumer_tag='', exclusive=False, no_ack=False, no_local=False, arguments=None)[source]
Start a queue consumer.
- Parameters
callback (Callable) – Message callback
queue (str) – Queue name
consumer_tag (str) – Consumer tag
no_local (bool) – Do not deliver own messages
no_ack (bool) – No acknowledgement needed
exclusive (bool) – Request exclusive access
arguments (dict) – Consume key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
Consumer tag
- Return type
str
- cancel(consumer_tag='')[source]
Cancel a queue consumer.
- Parameters
consumer_tag (str) – Consumer tag
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- publish(body, routing_key, exchange='', properties=None, mandatory=False, immediate=False)[source]
Publish a Message.
- Parameters
body (bytes,str,unicode) – Message payload
routing_key (str) – Message routing key
exchange (str) – The exchange to publish the message to
properties (dict) – Message properties
mandatory (bool) – Requires the message is published
immediate (bool) – Request immediate delivery
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
bool,None
- ack(delivery_tag=0, multiple=False)[source]
Acknowledge Message.
- Parameters
delivery_tag (int/long) – Server-assigned delivery tag
multiple (bool) – Acknowledge multiple messages
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- nack(delivery_tag=0, multiple=False, requeue=True)[source]
Negative Acknowledgement.
- Parameters
delivery_tag (int/long) – Server-assigned delivery tag
multiple (bool) – Negative acknowledge multiple messages
requeue (bool) – Re-queue the message
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
- reject(delivery_tag=0, requeue=True)[source]
Reject Message.
- Parameters
delivery_tag (int/long) – Server-assigned delivery tag
requeue (bool) – Re-queue the message
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns
Channel.Exchange
- class amqpstorm.exchange.Exchange(channel)[source]
RabbitMQ Exchange Operations.
- declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, arguments=None)[source]
Declare an Exchange.
- Parameters
exchange (str) – Exchange name
exchange_type (str) – Exchange type
passive (bool) – Do not create
durable (bool) – Durable exchange
auto_delete (bool) – Automatically delete when not in use
arguments (dict) – Exchange key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- delete(exchange='', if_unused=False)[source]
Delete an Exchange.
- Parameters
exchange (str) – Exchange name
if_unused (bool) – Delete only if unused
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- bind(destination='', source='', routing_key='', arguments=None)[source]
Bind an Exchange.
- Parameters
destination (str) – Exchange name
source (str) – Exchange to bind to
routing_key (str) – The routing key to use
arguments (dict) – Bind key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- unbind(destination='', source='', routing_key='', arguments=None)[source]
Unbind an Exchange.
- Parameters
destination (str) – Exchange name
source (str) – Exchange to unbind from
routing_key (str) – The routing key used
arguments (dict) – Unbind key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
Channel.Queue
- class amqpstorm.queue.Queue(channel)[source]
RabbitMQ Queue Operations.
- declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]
Declare a Queue.
- Parameters
queue (str) – Queue name
passive (bool) – Do not create
durable (bool) – Durable queue
exclusive (bool) – Request exclusive access
auto_delete (bool) – Automatically delete when not in use
arguments (dict) – Queue key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- delete(queue='', if_unused=False, if_empty=False)[source]
Delete a Queue.
- Parameters
queue (str) – Queue name
if_unused (bool) – Delete only if unused
if_empty (bool) – Delete only if empty
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- purge(queue)[source]
Purge a Queue.
- Parameters
queue (str) – Queue name
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- bind(queue='', exchange='', routing_key='', arguments=None)[source]
Bind a Queue.
- Parameters
queue (str) – Queue name
exchange (str) – Exchange name
routing_key (str) – The routing key to use
arguments (dict) – Bind key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
- unbind(queue='', exchange='', routing_key='', arguments=None)[source]
Unbind a Queue.
- Parameters
queue (str) – Queue name
exchange (str) – Exchange name
routing_key (str) – The routing key used
arguments (dict) – Unbind key/value arguments
- Raises
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type
dict
Channel.Tx
- class amqpstorm.tx.Tx(channel)[source]
RabbitMQ Transactions.
Server local transactions, in which the server will buffer published messages until the client commits (or rollback) the messages.
- select()[source]
Enable standard transaction mode.
This will enable transaction mode on the channel. Meaning that messages will be kept in the remote server buffer until such a time that either commit or rollback is called.
- Returns
- commit()[source]
Commit the current transaction.
Commit all messages published during the current transaction session to the remote server.
A new transaction session starts as soon as the command has been executed.
- Returns
- rollback()[source]
Abandon the current transaction.
Rollback all messages published during the current transaction session to the remote server.
Note that all messages published during this transaction session will be lost, and will have to be published again.
A new transaction session starts as soon as the command has been executed.
- Returns