1
+ import requests
2
+ from typing import Optional , List , Dict
3
+ import base64
4
+
5
+ class RabbitMQAdmin :
6
+ def __init__ (self , host : str , port : int , username : str , password : str , use_tls : bool ):
7
+ self .protocol = "https" if use_tls else "http"
8
+ self .base_url = f"{ self .protocol } ://{ host } :{ port } /api"
9
+ self .auth = base64 .b64encode (f"{ username } :{ password } " .encode ()).decode ()
10
+ self .headers = {
11
+ "Authorization" : f"Basic { self .auth } " ,
12
+ "Content-Type" : "application/json"
13
+ }
14
+
15
+ def _make_request (self , method : str , endpoint : str , data : Optional [Dict ] = None ) -> requests .Response :
16
+ url = f"{ self .base_url } /{ endpoint } "
17
+ response = requests .request (method , url , headers = self .headers , json = data , verify = True )
18
+ response .raise_for_status ()
19
+ return response
20
+
21
+ def list_queues (self ) -> List [Dict ]:
22
+ """List all queues in the RabbitMQ server"""
23
+ response = self ._make_request ("GET" , "queues" )
24
+ return response .json ()
25
+
26
+ def list_exchanges (self ) -> List [Dict ]:
27
+ """List all exchanges in the RabbitMQ server"""
28
+ response = self ._make_request ("GET" , "exchanges" )
29
+ return response .json ()
30
+
31
+ def get_queue_info (self , queue : str , vhost : str = "/" ) -> Dict :
32
+ """Get detailed information about a specific queue"""
33
+ vhost_encoded = requests .utils .quote (vhost , safe = '' )
34
+ response = self ._make_request ("GET" , f"queues/{ vhost_encoded } /{ queue } " )
35
+ return response .json ()
36
+
37
+ def delete_queue (self , queue : str , vhost : str = "/" ) -> None :
38
+ """Delete a queue"""
39
+ validate_rabbitmq_name (queue , "Queue name" )
40
+ vhost_encoded = requests .utils .quote (vhost , safe = '' )
41
+ self ._make_request ("DELETE" , f"queues/{ vhost_encoded } /{ queue } " )
42
+
43
+ def purge_queue (self , queue : str , vhost : str = "/" ) -> None :
44
+ """Remove all messages from a queue"""
45
+ validate_rabbitmq_name (queue , "Queue name" )
46
+ vhost_encoded = requests .utils .quote (vhost , safe = '' )
47
+ self ._make_request ("DELETE" , f"queues/{ vhost_encoded } /{ queue } /contents" )
48
+
49
+ def get_exchange_info (self , exchange : str , vhost : str = "/" ) -> Dict :
50
+ """Get detailed information about a specific exchange"""
51
+ vhost_encoded = requests .utils .quote (vhost , safe = '' )
52
+ response = self ._make_request ("GET" , f"exchanges/{ vhost_encoded } /{ exchange } " )
53
+ return response .json ()
54
+
55
+ def delete_exchange (self , exchange : str , vhost : str = "/" ) -> None :
56
+ """Delete an exchange"""
57
+ validate_rabbitmq_name (exchange , "Exchange name" )
58
+ vhost_encoded = requests .utils .quote (vhost , safe = '' )
59
+ self ._make_request ("DELETE" , f"exchanges/{ vhost_encoded } /{ exchange } " )
60
+
61
+ def get_bindings (self , queue : Optional [str ] = None , exchange : Optional [str ] = None , vhost : str = "/" ) -> List [Dict ]:
62
+ """Get bindings, optionally filtered by queue or exchange"""
63
+ vhost_encoded = requests .utils .quote (vhost , safe = '' )
64
+ if queue :
65
+ validate_rabbitmq_name (queue , "Queue name" )
66
+ response = self ._make_request ("GET" , f"queues/{ vhost_encoded } /{ queue } /bindings" )
67
+ elif exchange :
68
+ validate_rabbitmq_name (exchange , "Exchange name" )
69
+ response = self ._make_request ("GET" , f"exchanges/{ vhost_encoded } /{ exchange } /bindings/source" )
70
+ else :
71
+ response = self ._make_request ("GET" , f"bindings/{ vhost_encoded } " )
72
+ return response .json ()
73
+
74
+ def get_overview (self ) -> Dict :
75
+ """Get overview of RabbitMQ server including version, stats, and listeners"""
76
+ response = self ._make_request ("GET" , "overview" )
77
+ return response .json ()
78
+
79
+ if __name__ == "__main__" :
80
+ # test
81
+ admin = RabbitMQAdmin ("localhost" , 15672 , "guest" , "guest" , False )
82
+ print (admin .list_queues ())
83
+ print ("Done" )
0 commit comments