File tree Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Original file line number Diff line number Diff line change @@ -2018,9 +2018,15 @@ async def on_partitions_assigned(self, assigned):
20182018 consumer1 .subscribe ([self .topic ], listener = listener1 )
20192019 consumer2 .subscribe ([self .topic ], listener = listener2 )
20202020
2021- # Make sure we rebalanced and ready for processing each of it's part
2022- await listener1 .assignment_ready .wait ()
2023- await listener2 .assignment_ready .wait ()
2021+ for _ in range (5 ):
2022+ # Make sure we rebalanced and ready for processing each of it's part
2023+ await listener1 .assignment_ready .wait ()
2024+ await listener2 .assignment_ready .wait ()
2025+
2026+ # Check the first is still ready to avoid flakiness
2027+ if listener1 .assignment_ready .is_set ():
2028+ break
2029+
20242030 self .assertTrue (consumer1 .assignment ())
20252031 self .assertTrue (consumer2 .assignment ())
20262032
You can’t perform that action at this time.
0 commit comments