From c8f97cb1140712b9d0b67dcdb440d0af7b7ba854 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Wed, 1 Oct 2025 21:28:15 -0400 Subject: [PATCH] Set regex subscription to empty if static member is replaced by classic protocol member --- .../group/GroupMetadataManager.java | 10 ++- .../group/GroupMetadataManagerTest.java | 69 +++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index cea68e09ffba1..e8a879fa366be 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3025,10 +3025,14 @@ private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember( } // Copy the member but with its new member id. - ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(existingStaticMemberOrNull, memberId) + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(existingStaticMemberOrNull, memberId) .setMemberEpoch(0) - .setPreviousMemberEpoch(0) - .build(); + .setPreviousMemberEpoch(0); + if (useClassicProtocol) { + // Regex subscription is not supported for classic member. + memberBuilder.setSubscribedTopicRegex(""); + } + ConsumerGroupMember newMember = memberBuilder.build(); // Generate the records to replace the member. We don't care about the regular expression // here because it is taken care of later after the static membership replacement. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 957ae7e814715..1d546cbd2b2a6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -20985,6 +20985,75 @@ foooTopicName, computeTopicHash(foooTopicName, new KRaftCoordinatorMetadataImage ); } + @Test + public void testConsumerMemberWithRegexReplacedByClassicMember() { + String groupId = "fooup"; + String instanceId = "instance-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .buildCoordinatorMetadataImage(12345L); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new NoOpPartitionAssignor())) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName(NoOpPartitionAssignor.NAME) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName(NoOpPartitionAssignor.NAME) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + // Member 1 is replaced by a classic member with the same instance id. + context.sendClassicGroupJoin( + new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build() + ); + + // The new member should have no regex subscription. + assertEquals( + "", + context.groupMetadataManager.consumerGroup(groupId).staticMember(instanceId).subscribedTopicRegex() + ); + } + @Test public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() { String groupId = "fooup";