Skip to content

Commit 9bb046e

Browse files
authored
Merge pull request ClickHouse#79050 from ClickHouse/add-rabbitmq-monitor
Add RabbitMQ monitor to debug test_storage_rabbitmq
2 parents 7d61073 + eef5c20 commit 9bb046e

File tree

2 files changed

+129
-30
lines changed

2 files changed

+129
-30
lines changed

tests/integration/helpers/cluster.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2417,7 +2417,7 @@ def wait_rabbitmq_to_start(self, timeout=120):
24172417
return True
24182418
except Exception as ex:
24192419
logging.debug("RabbitMQ await_startup failed, %s:", ex)
2420-
time.sleep(0.5)
2420+
time.sleep(0.1)
24212421

24222422
start = time.time()
24232423
while time.time() - start < timeout:
@@ -2445,6 +2445,9 @@ def reset_rabbitmq(self, timeout=120):
24452445
run_rabbitmqctl(self.rabbitmq_docker_id, self.rabbitmq_cookie, "reset", timeout)
24462446
self.start_rabbitmq_app()
24472447

2448+
def run_rabbitmqctl(self, command):
2449+
run_rabbitmqctl(self.rabbitmq_docker_id, self.rabbitmq_cookie, command)
2450+
24482451
def wait_nats_is_available(self, max_retries=5):
24492452
retries = 0
24502453
while True:

tests/integration/test_storage_rabbitmq/test_failed_connection.py

Lines changed: 125 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22
import time
3+
import json
34

45
import pytest
6+
import pika
57

68
from helpers.client import QueryRuntimeException
79
from helpers.cluster import ClickHouseCluster
@@ -44,43 +46,133 @@
4446
# Helpers
4547

4648

47-
def suspend_rabbitmq(rabbitmq_cluster):
49+
class RabbitMQMonitor:
50+
# The RabbitMQMonitor class aims to trace all published and delivered events of RabbitMQ
51+
# It servers as an additional check to see whether the error happens in ClickHouse or
52+
# in the RabbitMQ server itself.
53+
54+
published = set()
55+
delivered = set()
56+
connection = None
57+
channel = None
58+
queue_name = None
59+
rabbitmq_cluster = None
60+
expected_published = 0
61+
expected_delivered = 0
62+
63+
def _consume(self, timeout=180):
64+
logging.debug("RabbitMQMonitor: Consuming trace RabbitMQ messages...")
65+
deadline = time.monotonic() + timeout
66+
while time.monotonic() < deadline:
67+
method, properties, body = self.channel.basic_get(self.queue_name, True)
68+
if method and properties and body:
69+
# logging.debug(f"Message received! method {method}, properties {properties}, body {body}")
70+
message = json.loads(body.decode("utf-8"))
71+
assert message["key"] == message["value"]
72+
value = int(message["key"])
73+
if "deliver" in method.routing_key:
74+
self.delivered.add(value)
75+
# logging.debug(f"Message delivered: {value}")
76+
elif "publish" in method.routing_key:
77+
self.published.add(value)
78+
# logging.debug(f"Message published: {value}")
79+
else:
80+
break
81+
logging.debug(f"RabbitMQMonitor: Consumed {len(self.published)} published messages and {len(self.delivered)} delivered messages")
82+
83+
def set_expectations(self, published, delivered):
84+
self.expected_published = published
85+
self.expected_delivered = delivered
86+
87+
def check(self):
88+
self._consume()
89+
90+
def _get_non_present(my_set, amount):
91+
non_present = list()
92+
for i in range(amount):
93+
if i not in my_set:
94+
non_present.append(i)
95+
if (len(non_present) >= 10):
96+
break
97+
return non_present
98+
99+
if self.expected_published > 0 and self.expected_published != len(self.published):
100+
pytest.fail(f"{len(self.published)}/{self.expected_published} (got/expected) messages published. Sample of not published: {_get_non_present(self.published, self.expected_published)}")
101+
if self.expected_delivered > 0 and self.expected_delivered != len(self.delivered):
102+
pytest.fail(f"{len(self.delivered)}/{self.expected_delivered} (got/expected) messages delivered. Sample of not delivered: {_get_non_present(self.delivered, self.expected_delivered)}")
103+
104+
def start(self, rabbitmq_cluster):
105+
self.rabbitmq_cluster = rabbitmq_cluster
106+
107+
logging.debug("RabbitMQMonitor: Creating a new connection for RabbitMQ")
108+
credentials = pika.PlainCredentials("root", "clickhouse")
109+
parameters = pika.ConnectionParameters(
110+
self.rabbitmq_cluster.rabbitmq_ip, self.rabbitmq_cluster.rabbitmq_port, "/", credentials
111+
)
112+
self.connection = pika.BlockingConnection(parameters)
113+
self.channel = self.connection.channel()
114+
115+
if not self.queue_name:
116+
queue_res = self.channel.queue_declare(queue="", durable=True)
117+
self.queue_name = queue_res.method.queue
118+
logging.debug(f"RabbitMQMonitor: Created debug queue to monitor RabbitMQ published and delivered messages: {self.queue_name}")
119+
120+
self.channel.queue_bind(exchange="amq.rabbitmq.trace", queue=self.queue_name, routing_key="publish.#")
121+
self.channel.queue_bind(exchange="amq.rabbitmq.trace", queue=self.queue_name, routing_key="deliver.#")
122+
123+
def stop(self):
124+
if self.connection:
125+
self._consume()
126+
self.channel.close()
127+
self.channel = None
128+
self.connection.close()
129+
self.connection = None
130+
131+
132+
def suspend_rabbitmq(rabbitmq_cluster, rabbitmq_monitor):
133+
rabbitmq_monitor.stop()
48134
rabbitmq_cluster.stop_rabbitmq_app()
49135

50136

51-
def resume_rabbitmq(rabbitmq_cluster):
137+
def resume_rabbitmq(rabbitmq_cluster, rabbitmq_monitor):
52138
rabbitmq_cluster.start_rabbitmq_app()
53139
rabbitmq_cluster.wait_rabbitmq_to_start()
140+
rabbitmq_monitor.start(rabbitmq_cluster)
54141

55142

56143
# Fixtures
57144

58-
59145
@pytest.fixture(scope="module")
60146
def rabbitmq_cluster():
61147
try:
62148
cluster.start()
149+
cluster.run_rabbitmqctl("trace_on")
63150
logging.debug("rabbitmq_id is {}".format(instance.cluster.rabbitmq_docker_id))
151+
logging.getLogger("pika").propagate = False
64152
yield cluster
65153

66154
finally:
67155
cluster.shutdown()
68156

69157

70158
@pytest.fixture(autouse=True)
71-
def rabbitmq_setup_teardown():
159+
def rabbitmq_monitor():
72160
logging.debug("RabbitMQ is available - running test")
73161
instance.query("CREATE DATABASE test")
74162
instance3.query("CREATE DATABASE test")
75-
yield # run test
163+
monitor = RabbitMQMonitor()
164+
monitor.start(cluster)
165+
yield monitor
76166
instance.query("DROP DATABASE test SYNC")
77167
instance3.query("DROP DATABASE test SYNC")
168+
monitor.check()
169+
monitor.stop()
78170
cluster.reset_rabbitmq()
79171

80172

81173
# Tests
82174

83-
def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
175+
def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster, rabbitmq_monitor):
84176
instance.query(
85177
"""
86178
DROP TABLE IF EXISTS test.consume;
@@ -90,27 +182,28 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
90182
CREATE TABLE test.consume (key UInt64, value UInt64)
91183
ENGINE = RabbitMQ
92184
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
93-
rabbitmq_flush_interval_ms=500,
94-
rabbitmq_max_block_size = 100,
95-
rabbitmq_exchange_name = 'producer_reconnect',
96-
rabbitmq_format = 'JSONEachRow',
97-
rabbitmq_num_consumers = 2,
98-
rabbitmq_row_delimiter = '\\n';
185+
rabbitmq_flush_interval_ms=500,
186+
rabbitmq_max_block_size = 100,
187+
rabbitmq_exchange_name = 'producer_reconnect',
188+
rabbitmq_format = 'JSONEachRow',
189+
rabbitmq_num_consumers = 2,
190+
rabbitmq_row_delimiter = '\\n';
99191
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
100192
SELECT * FROM test.consume;
101193
DROP TABLE IF EXISTS test.producer_reconnect;
102194
CREATE TABLE test.producer_reconnect (key UInt64, value UInt64)
103195
ENGINE = RabbitMQ
104196
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
105-
rabbitmq_exchange_name = 'producer_reconnect',
106-
rabbitmq_persistent = '1',
107-
rabbitmq_flush_interval_ms=1000,
108-
rabbitmq_format = 'JSONEachRow',
109-
rabbitmq_row_delimiter = '\\n';
197+
rabbitmq_exchange_name = 'producer_reconnect',
198+
rabbitmq_persistent = '1',
199+
rabbitmq_flush_interval_ms=1000,
200+
rabbitmq_format = 'JSONEachRow',
201+
rabbitmq_row_delimiter = '\\n';
110202
"""
111203
)
112204

113205
messages_num = 200000
206+
rabbitmq_monitor.set_expectations(published=messages_num, delivered=messages_num)
114207
deadline = time.monotonic() + 180
115208
while time.monotonic() < deadline:
116209
try:
@@ -131,6 +224,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
131224
deadline = time.monotonic() + 180
132225
while time.monotonic() < deadline:
133226
number = int(instance.query("SELECT count() FROM test.view"))
227+
logging.debug(f"{number}/{messages_num} before suspending RabbitMQ")
134228
if number != 0:
135229
if number == messages_num:
136230
pytest.fail("The RabbitMQ messages have been consumed before suspending the RabbitMQ server")
@@ -139,8 +233,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
139233
else:
140234
pytest.fail(f"Time limit of 180 seconds reached. The count is still 0.")
141235

142-
suspend_rabbitmq(rabbitmq_cluster)
143-
resume_rabbitmq(rabbitmq_cluster)
236+
suspend_rabbitmq(rabbitmq_cluster, rabbitmq_monitor)
237+
resume_rabbitmq(rabbitmq_cluster, rabbitmq_monitor)
144238

145239
deadline = time.monotonic() + 180
146240
while time.monotonic() < deadline:
@@ -166,20 +260,20 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster):
166260
)
167261

168262

169-
def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
263+
def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster, rabbitmq_monitor):
170264
instance.query(
171265
"""
172266
DROP TABLE IF EXISTS test.consumer_reconnect;
173267
CREATE TABLE test.consumer_reconnect (key UInt64, value UInt64)
174268
ENGINE = RabbitMQ
175269
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
176-
rabbitmq_exchange_name = 'consumer_reconnect',
177-
rabbitmq_num_consumers = 10,
178-
rabbitmq_flush_interval_ms = 100,
179-
rabbitmq_max_block_size = 100,
180-
rabbitmq_num_queues = 10,
181-
rabbitmq_format = 'JSONEachRow',
182-
rabbitmq_row_delimiter = '\\n';
270+
rabbitmq_exchange_name = 'consumer_reconnect',
271+
rabbitmq_num_consumers = 10,
272+
rabbitmq_flush_interval_ms = 100,
273+
rabbitmq_max_block_size = 100,
274+
rabbitmq_num_queues = 10,
275+
rabbitmq_format = 'JSONEachRow',
276+
rabbitmq_row_delimiter = '\\n';
183277
CREATE TABLE test.view (key UInt64, value UInt64)
184278
ENGINE = MergeTree
185279
ORDER BY key;
@@ -189,6 +283,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
189283
)
190284

191285
messages_num = 200000
286+
rabbitmq_monitor.set_expectations(published=messages_num, delivered=messages_num)
192287
deadline = time.monotonic() + 180
193288
while time.monotonic() < deadline:
194289
try:
@@ -209,6 +304,7 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
209304
deadline = time.monotonic() + 180
210305
while time.monotonic() < deadline:
211306
number = int(instance.query("SELECT count() FROM test.view"))
307+
logging.debug(f"{number}/{messages_num} before suspending RabbitMQ")
212308
if number != 0:
213309
if number == messages_num:
214310
pytest.fail("The RabbitMQ messages have been consumed before suspending the RabbitMQ server")
@@ -217,8 +313,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster):
217313
else:
218314
pytest.fail(f"Time limit of 180 seconds reached. The count is still 0.")
219315

220-
suspend_rabbitmq(rabbitmq_cluster)
221-
resume_rabbitmq(rabbitmq_cluster)
316+
suspend_rabbitmq(rabbitmq_cluster, rabbitmq_monitor)
317+
resume_rabbitmq(rabbitmq_cluster, rabbitmq_monitor)
222318

223319
# while int(instance.query('SELECT count() FROM test.view')) == 0:
224320
# time.sleep(0.1)

0 commit comments

Comments
 (0)