Channel

class amqpstorm.Channel(channel_id, connection, rpc_timeout)[source]

RabbitMQ Channel.

e.g.

channel = connection.channel()
property basic

RabbitMQ Basic Operations.

e.g.

message = channel.basic.get(queue='hello_world')
Return type:

amqpstorm.basic.Basic

property exchange

RabbitMQ Exchange Operations.

e.g.

channel.exchange.declare(exchange='hello_world')
Return type:

amqpstorm.exchange.Exchange

property queue

RabbitMQ Queue Operations.

e.g.

channel.queue.declare(queue='hello_world')
Return type:

amqpstorm.queue.Queue

property tx

RabbitMQ Tx Operations.

e.g.

channel.tx.commit()
Return type:

amqpstorm.tx.Tx

build_inbound_messages(break_on_empty=False, to_tuple=False, auto_decode=True, message_impl=None)[source]

Build messages in the inbound queue.

Parameters:
  • break_on_empty (bool) –

    Should we break the loop when there are no more messages in our inbound queue.

    This does not guarantee that the queue is emptied before the loop is broken, as messages may be consumed faster then they are being delivered by RabbitMQ, causing the loop to be broken prematurely.

  • to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

  • message_impl (class) – Optional message class to use, derived from BaseMessage, for created messages. Defaults to Message.

Raises:
Return type:

generator

close(reply_code=200, reply_text='')[source]

Close Channel.

Parameters:
  • reply_code (int) – Close reply code (e.g. 200)

  • reply_text (str) – Close reply text

Raises:
Returns:

check_for_errors()[source]

Check connection and channel for errors.

Raises:
Returns:

check_for_exceptions()[source]

Check channel for exceptions.

Raises:

AMQPChannelError – Raises if the channel encountered an error.

Returns:

confirm_deliveries()[source]

Set the channel to confirm that each message has been successfully delivered.

Raises:
Returns:

process_data_events(to_tuple=False, auto_decode=True)[source]

Consume inbound messages.

Parameters:
  • to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

Raises:
Returns:

start_consuming(to_tuple=False, auto_decode=True)[source]

Start consuming messages.

Parameters:
  • to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

Raises:
Returns:

stop_consuming()[source]

Stop consuming messages.

Raises:
Returns:

Channel.Basic

class amqpstorm.basic.Basic(channel, max_frame_size=None)[source]

RabbitMQ Basic Operations.

qos(prefetch_count=0, prefetch_size=0, global_=False)[source]

Specify quality of service.

Parameters:
  • prefetch_count (int) – Prefetch window in messages

  • prefetch_size (int/long) – Prefetch window in octets

  • global (bool) – Apply to entire connection

Raises:
Return type:

dict

get(queue='', no_ack=False, to_dict=False, auto_decode=True, message_impl=None)[source]

Fetch a single message.

Parameters:
  • queue (str) – Queue name

  • no_ack (bool) – No acknowledgement needed

  • to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

  • message_impl (class) – Message implementation based on BaseMessage

Raises:
Returns:

Returns a single message, as long as there is a message in the queue. If no message is available, returns None.

Return type:

amqpstorm.Message,dict,None

recover(requeue=False)[source]

Redeliver unacknowledged messages.

Parameters:

requeue (bool) – Re-queue the messages

Raises:
Return type:

dict

consume(callback=None, queue='', consumer_tag='', exclusive=False, no_ack=False, no_local=False, arguments=None)[source]

Start a queue consumer.

Parameters:
  • callback (Callable) – Message callback

  • queue (str) – Queue name

  • consumer_tag (str) – Consumer tag

  • no_local (bool) – Do not deliver own messages

  • no_ack (bool) – No acknowledgement needed

  • exclusive (bool) – Request exclusive access

  • arguments (dict) – Consume key/value arguments

Raises:
Returns:

Consumer tag

Return type:

str

cancel(consumer_tag='')[source]

Cancel a queue consumer.

Parameters:

consumer_tag (str) – Consumer tag

Raises:
Return type:

dict

publish(body, routing_key, exchange='', properties=None, mandatory=False, immediate=False)[source]

Publish a Message.

Parameters:
  • body (bytes,str,unicode) – Message payload

  • routing_key (str) – Message routing key

  • exchange (str) – The exchange to publish the message to

  • properties (dict) – Message properties

  • mandatory (bool) – Requires the message is published

  • immediate (bool) – Request immediate delivery

Raises:
Return type:

bool,None

ack(delivery_tag=0, multiple=False)[source]

Acknowledge Message.

Parameters:
  • delivery_tag (int/long) – Server-assigned delivery tag

  • multiple (bool) – Acknowledge multiple messages

Raises:
Returns:

nack(delivery_tag=0, multiple=False, requeue=True)[source]

Negative Acknowledgement.

Parameters:
  • delivery_tag (int/long) – Server-assigned delivery tag

  • multiple (bool) – Negative acknowledge multiple messages

  • requeue (bool) – Re-queue the message

Raises:
Returns:

reject(delivery_tag=0, requeue=True)[source]

Reject Message.

Parameters:
  • delivery_tag (int/long) – Server-assigned delivery tag

  • requeue (bool) – Re-queue the message

Raises:
Returns:

Channel.Exchange

class amqpstorm.exchange.Exchange(channel)[source]

RabbitMQ Exchange Operations.

declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, arguments=None)[source]

Declare an Exchange.

Parameters:
  • exchange (str) – Exchange name

  • exchange_type (str) – Exchange type

  • passive (bool) – Do not create

  • durable (bool) – Durable exchange

  • auto_delete (bool) – Automatically delete when not in use

  • arguments (dict) – Exchange key/value arguments

Raises:
Return type:

dict

delete(exchange='', if_unused=False)[source]

Delete an Exchange.

Parameters:
  • exchange (str) – Exchange name

  • if_unused (bool) – Delete only if unused

Raises:
Return type:

dict

bind(destination='', source='', routing_key='', arguments=None)[source]

Bind an Exchange.

Parameters:
  • destination (str) – Exchange name

  • source (str) – Exchange to bind to

  • routing_key (str) – The routing key to use

  • arguments (dict) – Bind key/value arguments

Raises:
Return type:

dict

unbind(destination='', source='', routing_key='', arguments=None)[source]

Unbind an Exchange.

Parameters:
  • destination (str) – Exchange name

  • source (str) – Exchange to unbind from

  • routing_key (str) – The routing key used

  • arguments (dict) – Unbind key/value arguments

Raises:
Return type:

dict

Channel.Queue

class amqpstorm.queue.Queue(channel)[source]

RabbitMQ Queue Operations.

declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]

Declare a Queue.

Parameters:
  • queue (str) – Queue name

  • passive (bool) – Do not create

  • durable (bool) – Durable queue

  • exclusive (bool) – Request exclusive access

  • auto_delete (bool) – Automatically delete when not in use

  • arguments (dict) – Queue key/value arguments

Raises:
Return type:

dict

delete(queue='', if_unused=False, if_empty=False)[source]

Delete a Queue.

Parameters:
  • queue (str) – Queue name

  • if_unused (bool) – Delete only if unused

  • if_empty (bool) – Delete only if empty

Raises:
Return type:

dict

purge(queue)[source]

Purge a Queue.

Parameters:

queue (str) – Queue name

Raises:
Return type:

dict

bind(queue='', exchange='', routing_key='', arguments=None)[source]

Bind a Queue.

Parameters:
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key to use

  • arguments (dict) – Bind key/value arguments

Raises:
Return type:

dict

unbind(queue='', exchange='', routing_key='', arguments=None)[source]

Unbind a Queue.

Parameters:
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key used

  • arguments (dict) – Unbind key/value arguments

Raises:
Return type:

dict

Channel.Tx

class amqpstorm.tx.Tx(channel)[source]

RabbitMQ Transactions.

Server local transactions, in which the server will buffer published messages until the client commits (or rollback) the messages.

select()[source]

Enable standard transaction mode.

This will enable transaction mode on the channel. Meaning that messages will be kept in the remote server buffer until such a time that either commit or rollback is called.

Returns:

commit()[source]

Commit the current transaction.

Commit all messages published during the current transaction session to the remote server.

A new transaction session starts as soon as the command has been executed.

Returns:

rollback()[source]

Abandon the current transaction.

Rollback all messages published during the current transaction session to the remote server.

Note that all messages published during this transaction session will be lost, and will have to be published again.

A new transaction session starts as soon as the command has been executed.

Returns: