Channel
- class amqpstorm.Channel(channel_id, connection, rpc_timeout)[source]
RabbitMQ Channel.
e.g.
channel = connection.channel()
- property basic
RabbitMQ Basic Operations.
e.g.
message = channel.basic.get(queue='hello_world')
- Return type:
- property exchange
RabbitMQ Exchange Operations.
e.g.
channel.exchange.declare(exchange='hello_world')
- Return type:
- property queue
RabbitMQ Queue Operations.
e.g.
channel.queue.declare(queue='hello_world')
- Return type:
- property tx
RabbitMQ Tx Operations.
e.g.
channel.tx.commit()
- Return type:
- build_inbound_messages(break_on_empty=False, to_tuple=False, auto_decode=True, message_impl=None)[source]
Build messages in the inbound queue.
- Parameters:
break_on_empty (bool) –
Should we break the loop when there are no more messages in our inbound queue.
This does not guarantee that the queue is emptied before the loop is broken, as messages may be consumed faster then they are being delivered by RabbitMQ, causing the loop to be broken prematurely.
to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.
auto_decode (bool) – Auto-decode strings when possible.
message_impl (class) – Optional message class to use, derived from BaseMessage, for created messages. Defaults to Message.
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
generator
- close(reply_code=200, reply_text='')[source]
Close Channel.
- Parameters:
reply_code (int) – Close reply code (e.g. 200)
reply_text (str) – Close reply text
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- check_for_errors()[source]
Check connection and channel for errors.
- Raises:
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- check_for_exceptions()[source]
Check channel for exceptions.
- Raises:
AMQPChannelError – Raises if the channel encountered an error.
- Returns:
- confirm_deliveries()[source]
Set the channel to confirm that each message has been successfully delivered.
- Raises:
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- process_data_events(to_tuple=False, auto_decode=True)[source]
Consume inbound messages.
- Parameters:
to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.
auto_decode (bool) – Auto-decode strings when possible.
- Raises:
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- start_consuming(to_tuple=False, auto_decode=True)[source]
Start consuming messages.
- Parameters:
to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.
auto_decode (bool) – Auto-decode strings when possible.
- Raises:
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- stop_consuming()[source]
Stop consuming messages.
- Raises:
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
Channel.Basic
- class amqpstorm.basic.Basic(channel, max_frame_size=None)[source]
RabbitMQ Basic Operations.
- qos(prefetch_count=0, prefetch_size=0, global_=False)[source]
Specify quality of service.
- Parameters:
prefetch_count (int) – Prefetch window in messages
prefetch_size (int/long) – Prefetch window in octets
global (bool) – Apply to entire connection
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- get(queue='', no_ack=False, to_dict=False, auto_decode=True, message_impl=None)[source]
Fetch a single message.
- Parameters:
queue (str) – Queue name
no_ack (bool) – No acknowledgement needed
to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.
auto_decode (bool) – Auto-decode strings when possible.
message_impl (class) – Message implementation based on BaseMessage
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
Returns a single message, as long as there is a message in the queue. If no message is available, returns None.
- Return type:
amqpstorm.Message,dict,None
- recover(requeue=False)[source]
Redeliver unacknowledged messages.
- Parameters:
requeue (bool) – Re-queue the messages
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- consume(callback=None, queue='', consumer_tag='', exclusive=False, no_ack=False, no_local=False, arguments=None)[source]
Start a queue consumer.
- Parameters:
callback (Callable) – Message callback
queue (str) – Queue name
consumer_tag (str) – Consumer tag
no_local (bool) – Do not deliver own messages
no_ack (bool) – No acknowledgement needed
exclusive (bool) – Request exclusive access
arguments (dict) – Consume key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
Consumer tag
- Return type:
str
- cancel(consumer_tag='')[source]
Cancel a queue consumer.
- Parameters:
consumer_tag (str) – Consumer tag
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- publish(body, routing_key, exchange='', properties=None, mandatory=False, immediate=False)[source]
Publish a Message.
- Parameters:
body (bytes,str,unicode) – Message payload
routing_key (str) – Message routing key
exchange (str) – The exchange to publish the message to
properties (dict) – Message properties
mandatory (bool) – Requires the message is published
immediate (bool) – Request immediate delivery
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
bool,None
- ack(delivery_tag=0, multiple=False)[source]
Acknowledge Message.
- Parameters:
delivery_tag (int/long) – Server-assigned delivery tag
multiple (bool) – Acknowledge multiple messages
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- nack(delivery_tag=0, multiple=False, requeue=True)[source]
Negative Acknowledgement.
- Parameters:
delivery_tag (int/long) – Server-assigned delivery tag
multiple (bool) – Negative acknowledge multiple messages
requeue (bool) – Re-queue the message
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
- reject(delivery_tag=0, requeue=True)[source]
Reject Message.
- Parameters:
delivery_tag (int/long) – Server-assigned delivery tag
requeue (bool) – Re-queue the message
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Returns:
Channel.Exchange
- class amqpstorm.exchange.Exchange(channel)[source]
RabbitMQ Exchange Operations.
- declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, arguments=None)[source]
Declare an Exchange.
- Parameters:
exchange (str) – Exchange name
exchange_type (str) – Exchange type
passive (bool) – Do not create
durable (bool) – Durable exchange
auto_delete (bool) – Automatically delete when not in use
arguments (dict) – Exchange key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- delete(exchange='', if_unused=False)[source]
Delete an Exchange.
- Parameters:
exchange (str) – Exchange name
if_unused (bool) – Delete only if unused
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- bind(destination='', source='', routing_key='', arguments=None)[source]
Bind an Exchange.
- Parameters:
destination (str) – Exchange name
source (str) – Exchange to bind to
routing_key (str) – The routing key to use
arguments (dict) – Bind key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- unbind(destination='', source='', routing_key='', arguments=None)[source]
Unbind an Exchange.
- Parameters:
destination (str) – Exchange name
source (str) – Exchange to unbind from
routing_key (str) – The routing key used
arguments (dict) – Unbind key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
Channel.Queue
- class amqpstorm.queue.Queue(channel)[source]
RabbitMQ Queue Operations.
- declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]
Declare a Queue.
- Parameters:
queue (str) – Queue name
passive (bool) – Do not create
durable (bool) – Durable queue
exclusive (bool) – Request exclusive access
auto_delete (bool) – Automatically delete when not in use
arguments (dict) – Queue key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- delete(queue='', if_unused=False, if_empty=False)[source]
Delete a Queue.
- Parameters:
queue (str) – Queue name
if_unused (bool) – Delete only if unused
if_empty (bool) – Delete only if empty
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- purge(queue)[source]
Purge a Queue.
- Parameters:
queue (str) – Queue name
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- bind(queue='', exchange='', routing_key='', arguments=None)[source]
Bind a Queue.
- Parameters:
queue (str) – Queue name
exchange (str) – Exchange name
routing_key (str) – The routing key to use
arguments (dict) – Bind key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
- unbind(queue='', exchange='', routing_key='', arguments=None)[source]
Unbind a Queue.
- Parameters:
queue (str) – Queue name
exchange (str) – Exchange name
routing_key (str) – The routing key used
arguments (dict) – Unbind key/value arguments
- Raises:
AMQPInvalidArgument – Invalid Parameters
AMQPChannelError – Raises if the channel encountered an error.
AMQPConnectionError – Raises if the connection encountered an error.
- Return type:
dict
Channel.Tx
- class amqpstorm.tx.Tx(channel)[source]
RabbitMQ Transactions.
Server local transactions, in which the server will buffer published messages until the client commits (or rollback) the messages.
- select()[source]
Enable standard transaction mode.
This will enable transaction mode on the channel. Meaning that messages will be kept in the remote server buffer until such a time that either commit or rollback is called.
- Returns:
- commit()[source]
Commit the current transaction.
Commit all messages published during the current transaction session to the remote server.
A new transaction session starts as soon as the command has been executed.
- Returns:
- rollback()[source]
Abandon the current transaction.
Rollback all messages published during the current transaction session to the remote server.
Note that all messages published during this transaction session will be lost, and will have to be published again.
A new transaction session starts as soon as the command has been executed.
- Returns: