Source code for amqpstorm.management.api

from amqpstorm.compatibility import quote
from amqpstorm.management.basic import Basic
from amqpstorm.management.channel import Channel
from amqpstorm.management.connection import Connection
from amqpstorm.management.exchange import Exchange
from amqpstorm.management.healthchecks import HealthChecks
from amqpstorm.management.http_client import HTTPClient
from amqpstorm.management.queue import Queue
from amqpstorm.management.user import User
from amqpstorm.management.virtual_host import VirtualHost

API_ALIVENESS_TEST = 'aliveness-test/%s'
API_CLUSTER_NAME = 'cluster-name'
API_NODE = 'nodes/%s'
API_NODES = 'nodes'
API_OVERVIEW = 'overview'
API_TOP = 'top/%s'
API_WHOAMI = 'whoami'


[docs]class ManagementApi(object): """RabbitMQ Management Api e.g. :: from amqpstorm.management import ManagementApi client = ManagementApi('https://localhost:15671', 'guest', 'guest', verify=True) client.user.create('my_user', 'password', tags='administrator') client.user.set_permission( 'my_user', virtual_host='/', configure_regex='.*', write_regex='.*', read_regex='.*' ) :param str api_url: RabbitMQ Management url (e.g. https://rmq.amqpstorm.io:15671) :param str username: Username (e.g. guest) :param str password: Password (e.g. guest) :param int,float timeout: TCP Timeout :param None,str,bool verify: Requests session verify (e.g. True, False or path to CA bundle) :param None,str,tuple cert: Requests session cert """ def __init__(self, api_url, username, password, timeout=10, verify=None, cert=None): self.http_client = HTTPClient( api_url, username, password, timeout=timeout, verify=verify, cert=cert ) self._basic = Basic(self.http_client) self._channel = Channel(self.http_client) self._connection = Connection(self.http_client) self._exchange = Exchange(self.http_client) self._healthchecks = HealthChecks(self.http_client) self._queue = Queue(self.http_client) self._user = User(self.http_client) self._virtual_host = VirtualHost(self.http_client) def __enter__(self): return self def __exit__(self, *_): pass def __del__(self): self.http_client.session.close() @property def basic(self): """RabbitMQ Basic Operations. e.g. :: client.basic.publish('Hello RabbitMQ', routing_key='my_queue') :rtype: amqpstorm.management.basic.Basic """ return self._basic @property def channel(self): """RabbitMQ Channel Operations. e.g. :: client.channel.list() :rtype: amqpstorm.management.channel.Channel """ return self._channel @property def connection(self): """RabbitMQ Connection Operations. e.g. :: client.connection.list() :rtype: amqpstorm.management.connection.Connection """ return self._connection @property def exchange(self): """RabbitMQ Exchange Operations. e.g. :: client.exchange.declare('my_exchange') :rtype: amqpstorm.management.exchange.Exchange """ return self._exchange @property def healthchecks(self): """RabbitMQ Healthchecks. e.g. :: client.healthchecks.get() :rtype: amqpstorm.management.healthchecks.Healthchecks """ return self._healthchecks @property def queue(self): """RabbitMQ Queue Operations. e.g. :: client.queue.declare('my_queue', virtual_host='/') :rtype: amqpstorm.management.queue.Queue """ return self._queue @property def user(self): """RabbitMQ User Operations. e.g. :: client.user.create('my_user', 'password') :rtype: amqpstorm.management.user.User """ return self._user @property def virtual_host(self): """RabbitMQ VirtualHost Operations. :rtype: amqpstorm.management.virtual_host.VirtualHost """ return self._virtual_host
[docs] def aliveness_test(self, virtual_host='/'): """Aliveness Test. e.g. :: from amqpstorm.management import ManagementApi client = ManagementApi('http://localhost:15672', 'guest', 'guest') result = client.aliveness_test('/') if result['status'] == 'ok': print("RabbitMQ is alive!") else: print("RabbitMQ is not alive! :(") :param str virtual_host: Virtual host name :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ virtual_host = quote(virtual_host, '') return self.http_client.get(API_ALIVENESS_TEST % virtual_host)
def cluster_name(self): """Get Cluster Name. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ return self.http_client.get(API_CLUSTER_NAME) def node(self, name): """Get Nodes. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ return self.http_client.get(API_NODE % name)
[docs] def nodes(self): """Get Nodes. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ return self.http_client.get(API_NODES)
[docs] def overview(self): """Get Overview. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ return self.http_client.get(API_OVERVIEW)
[docs] def top(self): """Top Processes. :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: list """ nodes = [] for node in self.nodes(): nodes.append(self.http_client.get(API_TOP % node['name'])) return nodes
[docs] def whoami(self): """Who am I? :raises ApiError: Raises if the remote server encountered an error. :raises ApiConnectionError: Raises if there was a connectivity issue. :rtype: dict """ return self.http_client.get(API_WHOAMI)