Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions python/integration_tests/test_consumer_rebalancing.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,28 @@ def test_tasks_written_once_during_rebalancing() -> None:
)
consumers_have_data = consumers_have_data and res >= max_pending_count // 3

consumers_have_error = False
for i in range(num_consumers):
with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f:
consumers_have_error = consumers_have_error or "[31mERROR" in f.read()

if not all([row[3] == 0 for row in row_count]):
print(
"Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs"
)
for i in range(num_consumers):
print(f"=== consumer {i} log ===")
with open(
str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r"
) as f:
print(f.read())
print("Test failed! Got duplicate/missing kafka messages in sqlite")

if not consumers_have_data:
print(
"Test failed! Lower than expected amount of kafka messages in sqlite, dumping logs"
)
print("Test failed! Lower than expected amount of kafka messages in sqlite")

if consumers_have_error:
print("Test failed! Errors in consumer logs")

if (
not all([row[3] == 0 for row in row_count])
or not consumers_have_data
or consumers_have_error
):
print()
print("Dumping logs")
print()
for i in range(num_consumers):
print(f"=== consumer {i} log ===")
with open(
Expand All @@ -229,5 +236,8 @@ def test_tasks_written_once_during_rebalancing() -> None:
print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}")
shutil.rmtree(TESTS_OUTPUT_PATH)

if not all([row[3] == 0 for row in row_count]) or not consumers_have_data:
assert False
assert (
all([row[3] == 0 for row in row_count])
and consumers_have_data
and not consumers_have_error
)
Loading