@@ -203,21 +203,31 @@ def test_tasks_written_once_during_rebalancing() -> None:
203203 )
204204 consumers_have_data = consumers_have_data and res >= max_pending_count // 3
205205
206+ consumers_have_error = False
207+ for i in range (num_consumers ):
208+ with open (str (TESTS_OUTPUT_PATH / f"consumer_{ i } _{ curr_time } .log" ), "r" ) as f :
209+ consumers_have_error = (
210+ consumers_have_error
211+ or b"[31mERROR\x1b " .decode ("utf-8" , "strict" ) in f .read ()
212+ )
213+
206214 if not all ([row [3 ] == 0 for row in row_count ]):
207- print (
208- "Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs"
209- )
210- for i in range (num_consumers ):
211- print (f"=== consumer { i } log ===" )
212- with open (
213- str (TESTS_OUTPUT_PATH / f"consumer_{ i } _{ curr_time } .log" ), "r"
214- ) as f :
215- print (f .read ())
215+ print ("Test failed! Got duplicate/missing kafka messages in sqlite" )
216216
217217 if not consumers_have_data :
218- print (
219- "Test failed! Lower than expected amount of kafka messages in sqlite, dumping logs"
220- )
218+ print ("Test failed! Lower than expected amount of kafka messages in sqlite" )
219+
220+ if consumers_have_error :
221+ print ("Test failed! Errors in consumer logs" )
222+
223+ if (
224+ not all ([row [3 ] == 0 for row in row_count ])
225+ or not consumers_have_data
226+ or consumers_have_error
227+ ):
228+ print ()
229+ print ("Dumping logs" )
230+ print ()
221231 for i in range (num_consumers ):
222232 print (f"=== consumer { i } log ===" )
223233 with open (
@@ -229,5 +239,9 @@ def test_tasks_written_once_during_rebalancing() -> None:
229239 print (f"Cleaning up test output files in { TESTS_OUTPUT_PATH } " )
230240 shutil .rmtree (TESTS_OUTPUT_PATH )
231241
232- if not all ([row [3 ] == 0 for row in row_count ]) or not consumers_have_data :
242+ if (
243+ not all ([row [3 ] == 0 for row in row_count ])
244+ or not consumers_have_data
245+ or consumers_have_error
246+ ):
233247 assert False
0 commit comments