Channel

class amqpstorm.Channel(channel_id, connection, rpc_timeout, on_close_impl=None)

RabbitMQ Channel.

e.g.

channel = connection.channel()
property basic

RabbitMQ Basic Operations.

e.g.

message = channel.basic.get(queue='hello_world')
Return type

amqpstorm.basic.Basic

property exchange

RabbitMQ Exchange Operations.

e.g.

channel.exchange.declare(exchange='hello_world')
Return type

amqpstorm.exchange.Exchange

property queue

RabbitMQ Queue Operations.

e.g.

channel.queue.declare(queue='hello_world')
Return type

amqpstorm.queue.Queue

property tx

RabbitMQ Tx Operations.

e.g.

channel.tx.commit()
Return type

amqpstorm.tx.Tx

close(reply_code=200, reply_text='')

Close Channel.

Parameters
  • reply_code (int) – Close reply code (e.g. 200)

  • reply_text (str) – Close reply text

Raises
Returns

confirm_deliveries()

Set the channel to confirm that each message has been successfully delivered.

Raises
Returns

start_consuming(to_tuple=False, auto_decode=True)

Start consuming messages.

Parameters
  • to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

Raises
Returns

stop_consuming()

Stop consuming messages.

Raises
Returns

process_data_events(to_tuple=False, auto_decode=True)

Consume inbound messages.

Parameters
  • to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

Raises
Returns

build_inbound_messages(break_on_empty=False, to_tuple=False, auto_decode=True)

Build messages in the inbound queue.

Parameters
  • break_on_empty (bool) –

    Should we break the loop when there are no more messages in our inbound queue.

    This does not guarantee that the queue is emptied before the loop is broken, as messages may be consumed faster then they are being delivered by RabbitMQ, causing the loop to be broken prematurely.

  • to_tuple (bool) – Should incoming messages be converted to a tuple before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

Raises
Return type

generator

check_for_errors()

Check connection and channel for errors.

Raises
Returns

check_for_exceptions()

Check channel for exceptions.

Raises

AMQPChannelError – Raises if the channel encountered an error.

Returns

Channel.Basic

class amqpstorm.basic.Basic(channel, max_frame_size=None)

RabbitMQ Basic Operations.

ack(delivery_tag=0, multiple=False)

Acknowledge Message.

Parameters
  • delivery_tag (int/long) – Server-assigned delivery tag

  • multiple (bool) – Acknowledge multiple messages

Raises
Returns

cancel(consumer_tag='')

Cancel a queue consumer.

Parameters

consumer_tag (str) – Consumer tag

Raises
Return type

dict

consume(callback=None, queue='', consumer_tag='', exclusive=False, no_ack=False, no_local=False, arguments=None)

Start a queue consumer.

Parameters
  • callback (function) – Message callback

  • queue (str) – Queue name

  • consumer_tag (str) – Consumer tag

  • no_local (bool) – Do not deliver own messages

  • no_ack (bool) – No acknowledgement needed

  • exclusive (bool) – Request exclusive access

  • arguments (dict) – Consume key/value arguments

Raises
Returns

Consumer tag

Return type

str

get(queue='', no_ack=False, to_dict=False, auto_decode=True)

Fetch a single message.

Parameters
  • queue (str) – Queue name

  • no_ack (bool) – No acknowledgement needed

  • to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.

  • auto_decode (bool) – Auto-decode strings when possible.

Raises
Returns

Returns a single message, as long as there is a message in the queue. If no message is available, returns None.

Return type

amqpstorm.Message,dict,None

nack(delivery_tag=0, multiple=False, requeue=True)

Negative Acknowledgement.

Parameters
  • delivery_tag (int/long) – Server-assigned delivery tag

  • multiple (bool) – Negative acknowledge multiple messages

  • requeue (bool) – Re-queue the message

Raises
Returns

publish(body, routing_key, exchange='', properties=None, mandatory=False, immediate=False)

Publish a Message.

Parameters
  • body (bytes,str,unicode) – Message payload

  • routing_key (str) – Message routing key

  • exchange (str) – The exchange to publish the message to

  • properties (dict) – Message properties

  • mandatory (bool) – Requires the message is published

  • immediate (bool) – Request immediate delivery

Raises
Return type

bool,None

qos(prefetch_count=0, prefetch_size=0, global_=False)

Specify quality of service.

Parameters
  • prefetch_count (int) – Prefetch window in messages

  • prefetch_size (int/long) – Prefetch window in octets

  • global (bool) – Apply to entire connection

Raises
Return type

dict

recover(requeue=False)

Redeliver unacknowledged messages.

Parameters

requeue (bool) – Re-queue the messages

Raises
Return type

dict

reject(delivery_tag=0, requeue=True)

Reject Message.

Parameters
  • delivery_tag (int/long) – Server-assigned delivery tag

  • requeue (bool) – Re-queue the message

Raises
Returns

Channel.Exchange

class amqpstorm.exchange.Exchange(channel)

RabbitMQ Exchange Operations.

bind(destination='', source='', routing_key='', arguments=None)

Bind an Exchange.

Parameters
  • destination (str) – Exchange name

  • source (str) – Exchange to bind to

  • routing_key (str) – The routing key to use

  • arguments (dict) – Bind key/value arguments

Raises
Return type

dict

declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, arguments=None)

Declare an Exchange.

Parameters
  • exchange (str) – Exchange name

  • exchange_type (str) – Exchange type

  • passive (bool) – Do not create

  • durable (bool) – Durable exchange

  • auto_delete (bool) – Automatically delete when not in use

  • arguments (dict) – Exchange key/value arguments

Raises
Return type

dict

delete(exchange='', if_unused=False)

Delete an Exchange.

Parameters
  • exchange (str) – Exchange name

  • if_unused (bool) – Delete only if unused

Raises
Return type

dict

unbind(destination='', source='', routing_key='', arguments=None)

Unbind an Exchange.

Parameters
  • destination (str) – Exchange name

  • source (str) – Exchange to unbind from

  • routing_key (str) – The routing key used

  • arguments (dict) – Unbind key/value arguments

Raises
Return type

dict

Channel.Queue

class amqpstorm.queue.Queue(channel)

RabbitMQ Queue Operations.

bind(queue='', exchange='', routing_key='', arguments=None)

Bind a Queue.

Parameters
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key to use

  • arguments (dict) – Bind key/value arguments

Raises
Return type

dict

declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)

Declare a Queue.

Parameters
  • queue (str) – Queue name

  • passive (bool) – Do not create

  • durable (bool) – Durable queue

  • exclusive (bool) – Request exclusive access

  • auto_delete (bool) – Automatically delete when not in use

  • arguments (dict) – Queue key/value arguments

Raises
Return type

dict

delete(queue='', if_unused=False, if_empty=False)

Delete a Queue.

Parameters
  • queue (str) – Queue name

  • if_unused (bool) – Delete only if unused

  • if_empty (bool) – Delete only if empty

Raises
Return type

dict

purge(queue)

Purge a Queue.

Parameters

queue (str) – Queue name

Raises
Return type

dict

unbind(queue='', exchange='', routing_key='', arguments=None)

Unbind a Queue.

Parameters
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key used

  • arguments (dict) – Unbind key/value arguments

Raises
Return type

dict

Channel.Tx

class amqpstorm.tx.Tx(channel)

RabbitMQ Transactions.

Server local transactions, in which the server will buffer published messages until the client commits (or rollback) the messages.

commit()

Commit the current transaction.

Commit all messages published during the current transaction session to the remote server.

A new transaction session starts as soon as the command has been executed.

Returns

rollback()

Abandon the current transaction.

Rollback all messages published during the current transaction session to the remote server.

Note that all messages published during this transaction session will be lost, and will have to be published again.

A new transaction session starts as soon as the command has been executed.

Returns

select()

Enable standard transaction mode.

This will enable transaction mode on the channel. Meaning that messages will be kept in the remote server buffer until such a time that either commit or rollback is called.

Returns