11import logging
22import time
33import json
4+ import threading
45
56import pytest
67import pika
@@ -57,15 +58,17 @@ class RabbitMQMonitor:
5758 channel = None
5859 queue_name = None
5960 rabbitmq_cluster = None
60- expected_published = 0
61- expected_delivered = 0
61+ expected_published = None
62+ expected_delivered = None
63+ consume_thread = None
64+ stop_event = threading .Event ()
6265
6366 def _consume (self , timeout = 180 ):
64- logging .debug ("RabbitMQMonitor: Consuming trace RabbitMQ messages..." )
67+ logging .debug ("RabbitMQMonitor: Consuming trace RabbitMQ messages in a working thread ..." )
6568 deadline = time .monotonic () + timeout
6669 _published = 0
6770 _delivered = 0
68- while time .monotonic () < deadline :
71+ while time .monotonic () < deadline and not self . stop_event . is_set () :
6972 method , properties , body = self .channel .basic_get (self .queue_name , auto_ack = True )
7073 if method and properties and body :
7174 # logging.debug(f"Message received! method {method}, properties {properties}, body {body}")
@@ -81,15 +84,34 @@ def _consume(self, timeout=180):
8184 _published += 1
8285 # logging.debug(f"Message published: {value}")
8386 else :
84- break
87+ time . sleep ( 0.1 )
8588 logging .debug (f"RabbitMQMonitor: Consumed { _published } /{ len (self .published )} published messages and { _delivered } /{ len (self .delivered )} delivered messages in this iteration" )
8689
90+ def _run (self ):
91+ logging .debug ("RabbitMQMonitor: Creating a new connection for RabbitMQ" )
92+ credentials = pika .PlainCredentials ("root" , "clickhouse" )
93+ parameters = pika .ConnectionParameters (
94+ self .rabbitmq_cluster .rabbitmq_ip , self .rabbitmq_cluster .rabbitmq_port , "/" , credentials
95+ )
96+ self .connection = pika .BlockingConnection (parameters )
97+ self .channel = self .connection .channel ()
98+
99+ if not self .queue_name :
100+ queue_res = self .channel .queue_declare (queue = "" , durable = True )
101+ self .queue_name = queue_res .method .queue
102+ logging .debug (f"RabbitMQMonitor: Created debug queue to monitor RabbitMQ published and delivered messages: { self .queue_name } " )
103+
104+ self .channel .queue_bind (exchange = "amq.rabbitmq.trace" , queue = self .queue_name , routing_key = "publish.#" )
105+ self .channel .queue_bind (exchange = "amq.rabbitmq.trace" , queue = self .queue_name , routing_key = "deliver.#" )
106+ self ._consume ()
107+
87108 def set_expectations (self , published , delivered ):
88109 self .expected_published = published
89110 self .expected_delivered = delivered
90111
91112 def check (self ):
92- self ._consume ()
113+ self .stop_event .set ()
114+ self .consume_thread .join ()
93115
94116 def _get_non_present (my_set , amount ):
95117 non_present = list ()
@@ -100,33 +122,23 @@ def _get_non_present(my_set, amount):
100122 break
101123 return non_present
102124
103- if self .expected_published > 0 and self .expected_published != len (self .published ):
125+ if self .expected_published and self .expected_published != len (self .published ):
104126 logging .warning (f"RabbitMQMonitor: { len (self .published )} /{ self .expected_published } (got/expected) messages published. Sample of not published: { _get_non_present (self .published , self .expected_published )} " )
105- if self .expected_delivered > 0 and self .expected_delivered != len (self .delivered ):
127+ if self .expected_delivered and self .expected_delivered != len (self .delivered ):
106128 logging .warning (f"RabbitMQMonitor: { len (self .delivered )} /{ self .expected_delivered } (got/expected) messages delivered. Sample of not delivered: { _get_non_present (self .delivered , self .expected_delivered )} " )
107129
108130 def start (self , rabbitmq_cluster ):
109131 self .rabbitmq_cluster = rabbitmq_cluster
110-
111- logging .debug ("RabbitMQMonitor: Creating a new connection for RabbitMQ" )
112- credentials = pika .PlainCredentials ("root" , "clickhouse" )
113- parameters = pika .ConnectionParameters (
114- self .rabbitmq_cluster .rabbitmq_ip , self .rabbitmq_cluster .rabbitmq_port , "/" , credentials
115- )
116- self .connection = pika .BlockingConnection (parameters )
117- self .channel = self .connection .channel ()
118-
119- if not self .queue_name :
120- queue_res = self .channel .queue_declare (queue = "" , durable = True )
121- self .queue_name = queue_res .method .queue
122- logging .debug (f"RabbitMQMonitor: Created debug queue to monitor RabbitMQ published and delivered messages: { self .queue_name } " )
123-
124- self .channel .queue_bind (exchange = "amq.rabbitmq.trace" , queue = self .queue_name , routing_key = "publish.#" )
125- self .channel .queue_bind (exchange = "amq.rabbitmq.trace" , queue = self .queue_name , routing_key = "deliver.#" )
132+ self .stop_event .clear ()
133+ self .consume_thread = threading .Thread (target = self ._run )
134+ logging .debug ("RabbitMQMonitor: Starting consuming thread..." )
135+ self .consume_thread .start ()
126136
127137 def stop (self ):
128138 if self .connection :
129- self ._consume ()
139+ if not self .stop_event .is_set ():
140+ self .stop_event .set ()
141+ self .consume_thread .join ()
130142 self .channel .close ()
131143 self .channel = None
132144 self .connection .close ()
@@ -214,8 +226,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster, r
214226 """
215227 )
216228
217- messages_num = 5000
218- rabbitmq_monitor .set_expectations (published = messages_num , delivered = messages_num )
229+ messages_num = 10000
230+ rabbitmq_monitor .set_expectations (published = None , delivered = messages_num )
219231 deadline = time .monotonic () + DEFAULT_TIMEOUT_SEC
220232 while time .monotonic () < deadline :
221233 try :
@@ -305,8 +317,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster, r
305317 """
306318 )
307319
308- messages_num = 5000
309- rabbitmq_monitor .set_expectations (published = messages_num , delivered = messages_num )
320+ messages_num = 10000
321+ rabbitmq_monitor .set_expectations (published = None , delivered = messages_num )
310322 deadline = time .monotonic () + DEFAULT_TIMEOUT_SEC
311323 while time .monotonic () < deadline :
312324 try :
0 commit comments