Management Api

class amqpstorm.management.ManagementApi(api_url, username, password, timeout=10, verify=None, cert=None)[source]

RabbitMQ Management Api

e.g.

from amqpstorm.management import ManagementApi
client = ManagementApi('https://localhost:15671', 'guest', 'guest', verify=True)
client.user.create('my_user', 'password', tags='administrator')
client.user.set_permission(
    'my_user',
    virtual_host='/',
    configure_regex='.*',
    write_regex='.*',
    read_regex='.*'
)
Parameters:
  • api_url (str) – RabbitMQ Management url (e.g. https://rmq.amqpstorm.io:15671)

  • username (str) – Username (e.g. guest)

  • password (str) – Password (e.g. guest)

  • timeout (int,float) – TCP Timeout

  • verify (None,str,bool) – Requests session verify (e.g. True, False or path to CA bundle)

  • cert (None,str,tuple) – Requests session cert

property basic

RabbitMQ Basic Operations.

e.g.

client.basic.publish('Hello RabbitMQ', routing_key='my_queue')
Return type:

amqpstorm.management.basic.Basic

property channel

RabbitMQ Channel Operations.

e.g.

client.channel.list()
Return type:

amqpstorm.management.channel.Channel

property connection

RabbitMQ Connection Operations.

e.g.

client.connection.list()
Return type:

amqpstorm.management.connection.Connection

property exchange

RabbitMQ Exchange Operations.

e.g.

client.exchange.declare('my_exchange')
Return type:

amqpstorm.management.exchange.Exchange

property queue

RabbitMQ Queue Operations.

e.g.

client.queue.declare('my_queue', virtual_host='/')
Return type:

amqpstorm.management.queue.Queue

property user

RabbitMQ User Operations.

e.g.

client.user.create('my_user', 'password')
Return type:

amqpstorm.management.user.User

aliveness_test(virtual_host='/')[source]

Aliveness Test.

e.g.

from amqpstorm.management import ManagementApi
client = ManagementApi('http://localhost:15672', 'guest', 'guest')
result = client.aliveness_test('/')
if result['status'] == 'ok':
    print("RabbitMQ is alive!")
else:
    print("RabbitMQ is not alive! :(")
Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

nodes()[source]

Get Nodes.

Raises:
Return type:

dict

overview()[source]

Get Overview.

Raises:
Return type:

dict

top()[source]

Top Processes.

Raises:
Return type:

list

whoami()[source]

Who am I?

Raises:
Return type:

dict

class amqpstorm.management.basic.Basic(http_client)[source]
publish(body, routing_key, exchange='amq.default', virtual_host='/', properties=None, payload_encoding='string')[source]

Publish a Message.

Parameters:
  • body (bytes,str,unicode) – Message payload

  • routing_key (str) – Message routing key

  • exchange (str) – The exchange to publish the message to

  • virtual_host (str) – Virtual host name

  • properties (dict) – Message properties

  • payload_encoding (str) – Payload encoding.

Raises:
Return type:

dict

get(queue, virtual_host='/', requeue=False, to_dict=False, count=1, truncate=50000, encoding='auto')[source]

Get Messages.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

  • requeue (bool) – Re-queue message

  • to_dict (bool) – Should incoming messages be converted to a dictionary before delivery.

  • count (int) – How many messages should we try to fetch.

  • truncate (int) – The maximum length in bytes, beyond that the server will truncate the message.

  • encoding (str) – Message encoding.

Raises:
Return type:

list

class amqpstorm.management.channel.Channel(http_client)[source]
get(channel)[source]

Get Connection details.

Parameters:

channel – Channel name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the channel cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list(name=None, page_size=None, use_regex=False)[source]

List all Channels.

Parameters:
  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
Return type:

list

class amqpstorm.management.connection.Connection(http_client)[source]
get(connection)[source]

Get Connection details.

Parameters:

connection (str) – Connection name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the connection cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list(name=None, page_size=100, use_regex=False)[source]

Get Connections.

Parameters:
  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
Return type:

list

close(connection, reason='Closed via management api')[source]

Close Connection.

Parameters:
  • connection (str) – Connection name

  • reason (str) – Reason for closing connection.

Raises:
Return type:

None

class amqpstorm.management.exchange.Exchange(http_client)[source]
get(exchange, virtual_host='/')[source]

Get Exchange details.

Parameters:
  • exchange (str) – Exchange name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

list(virtual_host='/', show_all=False, name=None, page_size=100, use_regex=False)[source]

List Exchanges.

Parameters:
  • virtual_host (str) – Virtual host name

  • show_all (bool) – List Exchanges across all virtual hosts

  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the exchange cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

list

declare(exchange='', exchange_type='direct', virtual_host='/', passive=False, durable=False, auto_delete=False, internal=False, arguments=None)[source]

Declare an Exchange.

Parameters:
  • exchange (str) – Exchange name

  • exchange_type (str) – Exchange type

  • virtual_host (str) – Virtual host name

  • passive (bool) – Do not create

  • durable (bool) – Durable exchange

  • auto_delete (bool) – Automatically delete when not in use

  • internal (bool) – Is the exchange for use by the broker only.

  • arguments (dict,None) – Exchange key/value arguments

Raises:
Return type:

None

delete(exchange, virtual_host='/')[source]

Delete an Exchange.

Parameters:
  • exchange (str) – Exchange name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

bindings(exchange, virtual_host='/')[source]

Get Exchange bindings.

Parameters:
  • exchange (str) – Exchange name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

list

bind(destination='', source='', routing_key='', virtual_host='/', arguments=None)[source]

Bind an Exchange.

Parameters:
  • source (str) – Source Exchange name

  • destination (str) – Destination Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • arguments (dict,None) – Bind key/value arguments

Raises:
Return type:

None

unbind(destination='', source='', routing_key='', virtual_host='/', properties_key=None)[source]

Unbind an Exchange.

Parameters:
  • source (str) – Source Exchange name

  • destination (str) – Destination Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • properties_key (str)

Raises:
Return type:

None

class amqpstorm.management.queue.Queue(http_client)[source]
get(queue, virtual_host='/')[source]

Get Queue details.

Parameters:
  • queue – Queue name

  • virtual_host (str) – Virtual host name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the queue cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list(virtual_host='/', show_all=False, name=None, page_size=100, use_regex=False)[source]

List Queues.

Parameters:
  • virtual_host (str) – Virtual host name

  • show_all (bool) – List Queues across all virtual hosts

  • name – Filter by name

  • use_regex – Enables regular expression for the param name

  • page_size – Number of elements per page

Raises:
Return type:

list

declare(queue='', virtual_host='/', passive=False, durable=False, auto_delete=False, arguments=None)[source]

Declare a Queue.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

  • passive (bool) – Do not create

  • durable (bool) – Durable queue

  • auto_delete (bool) – Automatically delete when not in use

  • arguments (dict,None) – Queue key/value arguments

Raises:
Return type:

dict

delete(queue, virtual_host='/')[source]

Delete a Queue.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

purge(queue, virtual_host='/')[source]

Purge a Queue.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

None

bindings(queue, virtual_host='/')[source]

Get Queue bindings.

Parameters:
  • queue (str) – Queue name

  • virtual_host (str) – Virtual host name

Raises:
Return type:

list

bind(queue='', exchange='', routing_key='', virtual_host='/', arguments=None)[source]

Bind a Queue.

Parameters:
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • arguments (dict,None) – Bind key/value arguments

Raises:
Return type:

None

unbind(queue='', exchange='', routing_key='', virtual_host='/', properties_key=None)[source]

Unbind a Queue.

Parameters:
  • queue (str) – Queue name

  • exchange (str) – Exchange name

  • routing_key (str) – The routing key to use

  • virtual_host (str) – Virtual host name

  • properties_key (str)

Raises:
Return type:

None

class amqpstorm.management.user.User(http_client)[source]
get(username)[source]

Get User details.

Parameters:

username (str) – Username

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the user cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list()[source]

List all Users.

Return type:

list

create(username, password, tags='')[source]

Create User.

Parameters:
  • username (str) – Username

  • password (str) – Password

  • tags (str,list) – Comma-separate list of tags (e.g. monitoring)

Raises:
Return type:

None

delete(username)[source]

Delete User or a list of Users.

Parameters:

username (str,list) – Username or a list of Usernames

Raises:
Return type:

dict

get_permission(username, virtual_host)[source]

Get User permissions for the configured virtual host.

Parameters:
  • username (str) – Username

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

get_permissions(username)[source]

Get all Users permissions.

Parameters:

username (str) – Username

Raises:
Return type:

dict

set_permission(username, virtual_host, configure_regex='.*', write_regex='.*', read_regex='.*')[source]

Set User permissions for the configured virtual host.

Parameters:
  • username (str) – Username

  • virtual_host (str) – Virtual host name

  • configure_regex (str) – Permission pattern for configuration operations for this user.

  • write_regex (str) – Permission pattern for write operations for this user.

  • read_regex (str) – Permission pattern for read operations for this user.

Raises:
Return type:

dict

delete_permission(username, virtual_host)[source]

Delete User permissions for the configured virtual host.

Parameters:
  • username (str) – Username

  • virtual_host (str) – Virtual host name

Raises:
Return type:

dict

class amqpstorm.management.virtual_host.VirtualHost(http_client)[source]
get(virtual_host)[source]

Get Virtual Host details.

Parameters:

virtual_host (str) – Virtual host name

Raises:
  • ApiError – Raises if the remote server encountered an error. We also raise an exception if the virtual host cannot be found.

  • ApiConnectionError – Raises if there was a connectivity issue.

Return type:

dict

list()[source]

List all Virtual Hosts.

Raises:
Return type:

list

create(virtual_host)[source]

Create a Virtual Host.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

delete(virtual_host)[source]

Delete a Virtual Host.

Parameters:

virtual_host (str) – Virtual host name

Raises:
Return type:

dict

get_permissions(virtual_host)[source]

Get all Virtual hosts permissions.

Raises:
Return type:

dict