Skip to content

Commit b221083

Browse files
authored
test(kafka): Assert each consumer is reasonably full in integration test (#71)
1 parent 500125b commit b221083

File tree

1 file changed

+25
-3
lines changed

1 file changed

+25
-3
lines changed

python/integration_tests/test_consumer_rebalancing.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def test_tasks_written_once_during_rebalancing() -> None:
5959
num_restarts = 16
6060
num_partitions = 32
6161
min_restart_duration = 1
62-
max_restart_duration = 10
62+
max_restart_duration = 30
63+
max_pending_count = 15_000
6364
topic_name = "task-worker"
6465
curr_time = int(time.time())
6566

@@ -99,7 +100,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
99100
consumer_configs[f"config_{i}.yml"] = {
100101
"db_name": db_name,
101102
"db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"),
102-
"max_pending_count": 16384,
103+
"max_pending_count": max_pending_count,
103104
"kafka_topic": topic_name,
104105
"kafka_consumer_group": topic_name,
105106
"kafka_auto_offset_reset": "earliest",
@@ -192,6 +193,16 @@ def test_tasks_written_once_during_rebalancing() -> None:
192193
f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}"
193194
)
194195

196+
consumers_have_data = True
197+
print("\n======== Number of rows in each consumer ========")
198+
for i, config in enumerate(consumer_configs.values()):
199+
query = f"""SELECT count(*) as count from {config['db_name']}.inflight_taskactivations"""
200+
res = cur.execute(query).fetchall()[0][0]
201+
print(
202+
f"Consumer {i}: {res}, {str(int(res / max_pending_count * 100))}% of capacity"
203+
)
204+
consumers_have_data = consumers_have_data and res >= max_pending_count // 3
205+
195206
if not all([row[3] == 0 for row in row_count]):
196207
print(
197208
"Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs"
@@ -203,9 +214,20 @@ def test_tasks_written_once_during_rebalancing() -> None:
203214
) as f:
204215
print(f.read())
205216

217+
if not consumers_have_data:
218+
print(
219+
"Test failed! Lower than expected amount of kafka messages in sqlite, dumping logs"
220+
)
221+
for i in range(num_consumers):
222+
print(f"=== consumer {i} log ===")
223+
with open(
224+
str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r"
225+
) as f:
226+
print(f.read())
227+
206228
# Clean up test output files
207229
print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}")
208230
shutil.rmtree(TESTS_OUTPUT_PATH)
209231

210-
if not all([row[3] == 0 for row in row_count]):
232+
if not all([row[3] == 0 for row in row_count]) or not consumers_have_data:
211233
assert False

0 commit comments

Comments
 (0)