diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 738abfa4..78cb82f8 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -203,21 +203,28 @@ def test_tasks_written_once_during_rebalancing() -> None: ) consumers_have_data = consumers_have_data and res >= max_pending_count // 3 + consumers_have_error = False + for i in range(num_consumers): + with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f: + consumers_have_error = consumers_have_error or "[31mERROR" in f.read() + if not all([row[3] == 0 for row in row_count]): - print( - "Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs" - ) - for i in range(num_consumers): - print(f"=== consumer {i} log ===") - with open( - str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r" - ) as f: - print(f.read()) + print("Test failed! Got duplicate/missing kafka messages in sqlite") if not consumers_have_data: - print( - "Test failed! Lower than expected amount of kafka messages in sqlite, dumping logs" - ) + print("Test failed! Lower than expected amount of kafka messages in sqlite") + + if consumers_have_error: + print("Test failed! Errors in consumer logs") + + if ( + not all([row[3] == 0 for row in row_count]) + or not consumers_have_data + or consumers_have_error + ): + print() + print("Dumping logs") + print() for i in range(num_consumers): print(f"=== consumer {i} log ===") with open( @@ -229,5 +236,8 @@ def test_tasks_written_once_during_rebalancing() -> None: print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") shutil.rmtree(TESTS_OUTPUT_PATH) - if not all([row[3] == 0 for row in row_count]) or not consumers_have_data: - assert False + assert ( + all([row[3] == 0 for row in row_count]) + and consumers_have_data + and not consumers_have_error + )