Skip to content

Commit 9e9d2a2

Browse files
authored
MINOR: fix flaky sys test for static membership (#20594)
Fixing flakiness seen on this test, where static consumers could not join as expected after shutting down previous consumers with the same instance ID, and logs showed `UnreleasedInstanceIdException`. I expect the flakiness could happen if a consumer with instanceId1 is closed but not effectively removed from the group due to leave group fail/delayed (the leave group request is sent on a best effort, not retried if fails or times out). Fix by adding check to ensure the group is empty before attempting to reuse the instance ID Reviewers: Matthias J. Sax <[email protected]>
1 parent 857b1e9 commit 9e9d2a2

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

tests/kafkatest/tests/client/consumer_test.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
313313
num_rebalances = consumer.num_rebalances()
314314
conflict_consumer.start()
315315
if group_protocol == consumer_group.classic_group_protocol:
316-
# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail.
316+
# Classic protocol: conflicting members should join, and the initial ones with conflicting instance id should fail.
317317
self.await_members(conflict_consumer, num_conflict_consumers)
318318
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)
319319

@@ -332,6 +332,11 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
332332
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
333333
timeout_sec=60,
334334
err_msg="Timed out waiting for the consumer to shutdown")
335+
# Wait until the group becomes empty to ensure the instance ID is released.
336+
# We use the 50-second timeout because the consumer session timeout is 45 seconds.
337+
wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
338+
timeout_sec=50,
339+
err_msg="Timed out waiting for the consumers to be removed from the group.")
335340
conflict_consumer.start()
336341
self.await_members(conflict_consumer, num_conflict_consumers)
337342

0 commit comments

Comments
 (0)