diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index de1e6f2a1f2fc..b739ebbbb5913 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -41,7 +41,8 @@ class ConsumerEventHandler(object): def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead, revoked_count=0, assigned_count=0, assignment=None, - position=None, committed=None, total_consumed=0): + position=None, committed=None, total_consumed=0, + shutdown_complete=False): self.node = node self.verify_offsets = verify_offsets self.idx = idx @@ -52,11 +53,13 @@ def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead, self.position = position if position is not None else {} self.committed = committed if committed is not None else {} self.total_consumed = total_consumed + self.shutdown_complete = shutdown_complete def handle_shutdown_complete(self, node=None, logger=None): self.state = ConsumerState.Dead self.assignment = [] self.position = {} + self.shutdown_complete = True if node is not None and logger is not None: logger.debug("Shut down %s" % node.account.hostname) @@ -279,7 +282,8 @@ def create_handler_helper(handler_class, node, idx, existing_handler=None): assignment=existing_handler.assignment, position=existing_handler.position, committed=existing_handler.committed, - total_consumed=existing_handler.total_consumed) + total_consumed=existing_handler.total_consumed, + shutdown_complete=existing_handler.shutdown_complete) else: return handler_class(node, self.verify_offsets, idx) existing_handler = self.event_handlers[node] if node in self.event_handlers else None @@ -294,6 +298,7 @@ def _worker(self, idx, node): with self.lock: self.event_handlers[node] = self.create_event_handler(idx, node) handler = self.event_handlers[node] + handler.shutdown_complete = False node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False) @@ -521,5 +526,10 @@ def alive_nodes(self): return [handler.node for handler in self.event_handlers.values() if handler.state != ConsumerState.Dead] + def shutdown_complete_nodes(self): + with self.lock: + return [handler.node for handler in self.event_handlers.values() + if handler.shutdown_complete] + def is_consumer_group_protocol_enabled(self): return self.group_protocol and self.group_protocol.lower() == consumer_group.consumer_group_protocol diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 6cb82869c4f72..676c15c45a2dd 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -18,6 +18,7 @@ from ducktape.mark.resource import cluster from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest +from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.kafka import TopicPartition, quorum, consumer_group import signal @@ -74,6 +75,14 @@ def setup_consumer(self, topic, **kwargs): self.mark_for_collect(consumer, 'verifiable_consumer_stdout') return consumer + def await_conflict_consumers_fenced(self, conflict_consumer): + # Rely on explicit shutdown_complete events from the verifiable consumer to guarantee each conflict member + # reached the fenced path rather than remaining in the default DEAD state prior to startup. + wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == len(conflict_consumer.nodes) and + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), + timeout_sec=60, + err_msg="Timed out waiting for conflict consumers to report shutdown completion after fencing") + @cluster(num_nodes=7) @matrix( metadata_quorum=[quorum.zk, quorum.isolated_kraft], @@ -355,7 +364,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me num_rebalances = consumer.num_rebalances() conflict_consumer.start() if group_protocol == consumer_group.classic_group_protocol: - # Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. + # Classic protocol: conflicting members should join, and the initial ones with conflicting instance id should fail. self.await_members(conflict_consumer, num_conflict_consumers) self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) @@ -368,12 +377,21 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" assert len(consumer.joined_nodes()) == len(consumer.nodes) assert len(conflict_consumer.joined_nodes()) == 0 - + + # Conflict consumers will terminate due to a fatal UnreleasedInstanceIdException error. + # Wait for termination to complete to prevent conflict consumers from immediately re-joining the group while existing nodes are shutting down. + self.await_conflict_consumers_fenced(conflict_consumer) + # Stop existing nodes, so conflicting ones should be able to join. consumer.stop_all() wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes), timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for the consumer to shutdown") + # Wait until the group becomes empty to ensure the instance ID is released. + # We use the 50-second timeout because the consumer session timeout is 45 seconds. + wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"), + timeout_sec=50, + err_msg="Timed out waiting for the consumers to be removed from the group.") conflict_consumer.start() self.await_members(conflict_consumer, num_conflict_consumers)