Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class ConsumerEventHandler(object):

def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
revoked_count=0, assigned_count=0, assignment=None,
position=None, committed=None, total_consumed=0):
position=None, committed=None, total_consumed=0,
shutdown_complete=False):
self.node = node
self.verify_offsets = verify_offsets
self.idx = idx
Expand All @@ -52,11 +53,13 @@ def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
self.position = position if position is not None else {}
self.committed = committed if committed is not None else {}
self.total_consumed = total_consumed
self.shutdown_complete = shutdown_complete

def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
self.assignment = []
self.position = {}
self.shutdown_complete = True

if node is not None and logger is not None:
logger.debug("Shut down %s" % node.account.hostname)
Expand Down Expand Up @@ -279,7 +282,8 @@ def create_handler_helper(handler_class, node, idx, existing_handler=None):
assignment=existing_handler.assignment,
position=existing_handler.position,
committed=existing_handler.committed,
total_consumed=existing_handler.total_consumed)
total_consumed=existing_handler.total_consumed,
shutdown_complete=existing_handler.shutdown_complete)
else:
return handler_class(node, self.verify_offsets, idx)
existing_handler = self.event_handlers[node] if node in self.event_handlers else None
Expand All @@ -294,6 +298,7 @@ def _worker(self, idx, node):
with self.lock:
self.event_handlers[node] = self.create_event_handler(idx, node)
handler = self.event_handlers[node]
handler.shutdown_complete = False

node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)

Expand Down Expand Up @@ -521,5 +526,10 @@ def alive_nodes(self):
return [handler.node for handler in self.event_handlers.values()
if handler.state != ConsumerState.Dead]

def shutdown_complete_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.values()
if handler.shutdown_complete]

def is_consumer_group_protocol_enabled(self):
return self.group_protocol and self.group_protocol.lower() == consumer_group.consumer_group_protocol
22 changes: 20 additions & 2 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ducktape.mark.resource import cluster

from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group

import signal
Expand Down Expand Up @@ -74,6 +75,14 @@ def setup_consumer(self, topic, **kwargs):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer

def await_conflict_consumers_fenced(self, conflict_consumer):
# Rely on explicit shutdown_complete events from the verifiable consumer to guarantee each conflict member
# reached the fenced path rather than remaining in the default DEAD state prior to startup.
wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == len(conflict_consumer.nodes) and
len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=60,
err_msg="Timed out waiting for conflict consumers to report shutdown completion after fencing")

@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
Expand Down Expand Up @@ -355,7 +364,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
num_rebalances = consumer.num_rebalances()
conflict_consumer.start()
if group_protocol == consumer_group.classic_group_protocol:
# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail.
# Classic protocol: conflicting members should join, and the initial ones with conflicting instance id should fail.
self.await_members(conflict_consumer, num_conflict_consumers)
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)

Expand All @@ -368,12 +377,21 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
assert len(conflict_consumer.joined_nodes()) == 0


# Conflict consumers will terminate due to a fatal UnreleasedInstanceIdException error.
# Wait for termination to complete to prevent conflict consumers from immediately re-joining the group while existing nodes are shutting down.
self.await_conflict_consumers_fenced(conflict_consumer)

# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
err_msg="Timed out waiting for the consumer to shutdown")
# Wait until the group becomes empty to ensure the instance ID is released.
# We use the 50-second timeout because the consumer session timeout is 45 seconds.
wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
timeout_sec=50,
err_msg="Timed out waiting for the consumers to be removed from the group.")
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)

Expand Down