@@ -59,7 +59,7 @@ class RabbitMQMonitor:
5959 rabbitmq_cluster = None
6060
6161 def _consume (self , timeout = 180 ):
62- logging .debug ("Consuming trace RabbitMQ messages..." )
62+ logging .debug ("RabbitMQMonitor: Consuming trace RabbitMQ messages..." )
6363 deadline = time .monotonic () + timeout
6464 while time .monotonic () < deadline :
6565 method , properties , body = self .channel .basic_get (self .queue_name , True )
@@ -76,11 +76,7 @@ def _consume(self, timeout=180):
7676 # logging.debug(f"Message published: {value}")
7777 else :
7878 break
79- logging .debug (f"Consumed { len (self .published )} published messages and { len (self .delivered )} delivered messages" )
80-
81- def reset (self ):
82- self .published = set ()
83- self .delivered = set ()
79+ logging .debug (f"RabbitMQMonitor: Consumed { len (self .published )} published messages and { len (self .delivered )} delivered messages" )
8480
8581 def check (self , published , delivered ):
8682 self ._consume ()
@@ -102,24 +98,26 @@ def _get_non_present(my_set, amount):
10298 def start (self , rabbitmq_cluster ):
10399 self .rabbitmq_cluster = rabbitmq_cluster
104100
101+ logging .debug ("RabbitMQMonitor: Creating a new connection for RabbitMQ" )
105102 credentials = pika .PlainCredentials ("root" , "clickhouse" )
106103 parameters = pika .ConnectionParameters (
107104 self .rabbitmq_cluster .rabbitmq_ip , self .rabbitmq_cluster .rabbitmq_port , "/" , credentials
108105 )
109106 self .connection = pika .BlockingConnection (parameters )
110107 self .channel = self .connection .channel ()
111- queue_res = self .channel .queue_declare (queue = "" , exclusive = True )
112- self .queue_name = queue_res .method .queue
113108
114- logging .debug (f"Created debug queue to monitor RabbitMQ published and delivered messages: { self .queue_name } " )
109+ if not self .queue_name :
110+ queue_res = self .channel .queue_declare (queue = "" , durable = True )
111+ self .queue_name = queue_res .method .queue
112+ logging .debug (f"RabbitMQMonitor: Created debug queue to monitor RabbitMQ published and delivered messages: { self .queue_name } " )
115113
116114 self .channel .queue_bind (exchange = "amq.rabbitmq.trace" , queue = self .queue_name , routing_key = "publish.#" )
117115 self .channel .queue_bind (exchange = "amq.rabbitmq.trace" , queue = self .queue_name , routing_key = "deliver.#" )
118116
119117 def stop (self ):
120118 if self .connection :
121119 self ._consume ()
122- self .channel .queue_delete ( self . queue_name )
120+ self .channel .close ( )
123121 self .channel = None
124122 self .connection .close ()
125123 self .connection = None
0 commit comments