Skip to content

Commit fb0518c

Browse files
lucasbruCopilot
andauthored
KAFKA-19730: StreamsGroupDescribe result is missing topology (#20574)
When toology not configured. In the streams group heartbeat, we validate the topology set for the group against the topic metadata, to generate the "configured topology" which has a specific number of partitions for each topic. In streams group describe, we use the configured topology to expose this information to the user. However, we leave the topology information as null in the streams group describe response, if the topology is not configured. This triggers an IllegalStateException in the admin client implementation. Instead, we should expose the unconfigured topology when the configured topology is not available, which will still expose useful information. Reviewers: Matthias J. Sax <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent ac495f9 commit fb0518c

File tree

6 files changed

+346
-4
lines changed

6 files changed

+346
-4
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4463,6 +4463,47 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
44634463
}
44644464
}
44654465

4466+
@Test
4467+
def testDescribeStreamsGroupsNotReady(): Unit = {
4468+
val streamsGroupId = "stream_group_id"
4469+
val testTopicName = "test_topic"
4470+
4471+
val config = createConfig
4472+
client = Admin.create(config)
4473+
4474+
val streams = createStreamsGroup(
4475+
inputTopic = testTopicName,
4476+
streamsGroupId = streamsGroupId
4477+
)
4478+
streams.poll(JDuration.ofMillis(500L))
4479+
4480+
try {
4481+
TestUtils.waitUntilTrue(() => {
4482+
val firstGroup = client.listGroups().all().get().stream()
4483+
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
4484+
firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId
4485+
}, "Stream group not NOT_READY yet")
4486+
4487+
// Verify the describe call works correctly
4488+
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
4489+
val group = describedGroups.get(streamsGroupId)
4490+
assertNotNull(group)
4491+
assertEquals(streamsGroupId, group.groupId())
4492+
assertFalse(group.members().isEmpty)
4493+
assertNotNull(group.subtopologies())
4494+
assertFalse(group.subtopologies().isEmpty)
4495+
4496+
// Verify the topology contains the expected source and sink topics
4497+
val subtopologies = group.subtopologies().asScala
4498+
assertTrue(subtopologies.exists(subtopology =>
4499+
subtopology.sourceTopics().contains(testTopicName)))
4500+
4501+
} finally {
4502+
Utils.closeQuietly(streams, "streams")
4503+
Utils.closeQuietly(client, "adminClient")
4504+
}
4505+
}
4506+
44664507
@Test
44674508
def testDeleteStreamsGroups(): Unit = {
44684509
val testTopicName = "test_topic"

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,16 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
10391039
.setGroupEpoch(groupEpoch.get(committedOffset))
10401040
.setGroupState(state.get(committedOffset).toString())
10411041
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
1042-
.setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null));
1042+
.setTopology(
1043+
configuredTopology.get(committedOffset)
1044+
.filter(ConfiguredTopology::isReady)
1045+
.map(ConfiguredTopology::asStreamsGroupDescribeTopology)
1046+
.orElse(
1047+
topology.get(committedOffset)
1048+
.map(StreamsTopology::asStreamsGroupDescribeTopology)
1049+
.orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group."))
1050+
)
1051+
);
10431052
members.entrySet(committedOffset).forEach(
10441053
entry -> describedGroup.members().add(
10451054
entry.getValue().asStreamsGroupDescribeMember(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams;
1818

19+
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
1920
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
2021
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
2122
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
2223
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
2324

2425
import java.util.Collections;
26+
import java.util.Comparator;
2527
import java.util.Map;
2628
import java.util.Objects;
2729
import java.util.Set;
@@ -95,4 +97,43 @@ public static StreamsTopology fromHeartbeatRequest(StreamsGroupHeartbeatRequestD
9597
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
9698
return new StreamsTopology(topology.epoch(), subtopologyMap);
9799
}
100+
101+
public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
102+
return new StreamsGroupDescribeResponseData.Topology()
103+
.setEpoch(topologyEpoch)
104+
.setSubtopologies(
105+
subtopologies.entrySet().stream()
106+
.sorted(Map.Entry.comparingByKey())
107+
.map(entry -> asStreamsGroupDescribeSubtopology(entry.getKey(), entry.getValue()))
108+
.toList()
109+
);
110+
}
111+
112+
private StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId, StreamsGroupTopologyValue.Subtopology subtopology) {
113+
return new StreamsGroupDescribeResponseData.Subtopology()
114+
.setSubtopologyId(subtopologyId)
115+
.setSourceTopics(subtopology.sourceTopics().stream().sorted().toList())
116+
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics().stream().sorted().toList())
117+
.setRepartitionSourceTopics(subtopology.repartitionSourceTopics().stream()
118+
.map(this::asStreamsGroupDescribeTopicInfo)
119+
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList())
120+
.setStateChangelogTopics(subtopology.stateChangelogTopics().stream()
121+
.map(this::asStreamsGroupDescribeTopicInfo)
122+
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
123+
}
124+
125+
private StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo(StreamsGroupTopologyValue.TopicInfo topicInfo) {
126+
return new StreamsGroupDescribeResponseData.TopicInfo()
127+
.setName(topicInfo.name())
128+
.setPartitions(topicInfo.partitions())
129+
.setReplicationFactor(topicInfo.replicationFactor())
130+
.setTopicConfigs(
131+
topicInfo.topicConfigs().stream().map(
132+
topicConfig -> new StreamsGroupDescribeResponseData.KeyValue()
133+
.setKey(topicConfig.key())
134+
.setValue(topicConfig.value())
135+
).toList()
136+
);
137+
}
138+
98139
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9645,19 +9645,43 @@ public void testStreamsGroupDescribeNoErrors() {
96459645
.setProcessId("processId")
96469646
.setMemberEpoch(epoch)
96479647
.setPreviousMemberEpoch(epoch - 1);
9648+
String subtopology1 = "subtopology1";
9649+
String fooTopicName = "foo";
9650+
StreamsTopology topology = new StreamsTopology(
9651+
0,
9652+
Map.of(subtopology1,
9653+
new StreamsGroupTopologyValue.Subtopology()
9654+
.setSubtopologyId(subtopology1)
9655+
.setSourceTopics(List.of(fooTopicName))
9656+
)
9657+
);
96489658

96499659
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
9650-
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch))
9660+
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)
9661+
.withTopology(topology)
9662+
)
96519663
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch)
9652-
.withMember(memberBuilder.build()))
9664+
.withMember(memberBuilder.build())
9665+
.withTopology(topology)
9666+
)
96539667
.build();
96549668

9669+
StreamsGroupDescribeResponseData.Topology expectedTopology =
9670+
new StreamsGroupDescribeResponseData.Topology()
9671+
.setEpoch(0)
9672+
.setSubtopologies(List.of(
9673+
new StreamsGroupDescribeResponseData.Subtopology()
9674+
.setSubtopologyId(subtopology1)
9675+
.setSourceTopics(List.of(fooTopicName))
9676+
));
9677+
96559678
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList(
96569679
new StreamsGroupDescribeResponseData.DescribedGroup()
96579680
.setGroupEpoch(epoch)
96589681
.setGroupId(streamsGroupIds.get(0))
96599682
.setGroupState(StreamsGroupState.EMPTY.toString())
9660-
.setAssignmentEpoch(0),
9683+
.setAssignmentEpoch(0)
9684+
.setTopology(expectedTopology),
96619685
new StreamsGroupDescribeResponseData.DescribedGroup()
96629686
.setGroupEpoch(epoch)
96639687
.setGroupId(streamsGroupIds.get(1))
@@ -9666,6 +9690,7 @@ public void testStreamsGroupDescribeNoErrors() {
96669690
TasksTuple.EMPTY
96679691
)
96689692
))
9693+
.setTopology(expectedTopology)
96699694
.setGroupState(StreamsGroupState.NOT_READY.toString())
96709695
);
96719696
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds);
@@ -9695,13 +9720,24 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
96959720
String memberId1 = "memberId1";
96969721
String memberId2 = "memberId2";
96979722
String subtopologyId = "subtopology1";
9723+
String fooTopicName = "foo";
9724+
StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
9725+
.setEpoch(0)
9726+
.setSubtopologies(
9727+
List.of(
9728+
new StreamsGroupTopologyValue.Subtopology()
9729+
.setSubtopologyId(subtopologyId)
9730+
.setSourceTopics(List.of(fooTopicName))
9731+
)
9732+
);
96989733

96999734
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
97009735

97019736
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
97029737
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
97039738
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
97049739
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
9740+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology));
97059741

97069742
TasksTuple assignment = new TasksTuple(
97079743
Map.of(subtopologyId, Set.of(0, 1)),
@@ -9733,6 +9769,17 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
97339769
memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY),
97349770
memberBuilder2.build().asStreamsGroupDescribeMember(assignment)
97359771
))
9772+
.setTopology(
9773+
new StreamsGroupDescribeResponseData.Topology()
9774+
.setEpoch(0)
9775+
.setSubtopologies(
9776+
List.of(
9777+
new StreamsGroupDescribeResponseData.Subtopology()
9778+
.setSubtopologyId(subtopologyId)
9779+
.setSourceTopics(List.of(fooTopicName))
9780+
)
9781+
)
9782+
)
97369783
.setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
97379784
.setGroupEpoch(epoch + 2);
97389785
assertEquals(1, actual.size());

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,4 +1019,124 @@ public void testShutdownRequestedMethods() {
10191019
streamsGroup.removeMember(memberId2);
10201020
assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId());
10211021
}
1022+
1023+
@Test
1024+
public void testAsDescribedGroupWithStreamsTopologyHavingSubtopologies() {
1025+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
1026+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-with-topology");
1027+
snapshotRegistry.idempotentCreateSnapshot(0);
1028+
1029+
// Create a topology with subtopologies
1030+
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
1031+
"sub-1", new StreamsGroupTopologyValue.Subtopology()
1032+
.setSubtopologyId("sub-1")
1033+
.setSourceTopics(List.of("input-topic"))
1034+
.setRepartitionSourceTopics(List.of(
1035+
new StreamsGroupTopologyValue.TopicInfo().setName("repartition-topic")
1036+
))
1037+
.setStateChangelogTopics(List.of(
1038+
new StreamsGroupTopologyValue.TopicInfo().setName("changelog-topic")
1039+
))
1040+
);
1041+
1042+
group.setGroupEpoch(2);
1043+
group.setTopology(new StreamsTopology(2, subtopologies));
1044+
group.setTargetAssignmentEpoch(2);
1045+
group.updateMember(new StreamsGroupMember.Builder("member1")
1046+
.setMemberEpoch(2)
1047+
.setPreviousMemberEpoch(1)
1048+
.setState(MemberState.STABLE)
1049+
.setInstanceId("instance1")
1050+
.setRackId("rack1")
1051+
.setClientId("client1")
1052+
.setClientHost("host1")
1053+
.setRebalanceTimeoutMs(1000)
1054+
.setTopologyEpoch(2)
1055+
.setProcessId("process1")
1056+
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
1057+
.setClientTags(Map.of("tag1", "value1"))
1058+
.setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
1059+
.setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), Map.of()))
1060+
.build());
1061+
snapshotRegistry.idempotentCreateSnapshot(1);
1062+
1063+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
1064+
1065+
assertEquals("group-id-with-topology", describedGroup.groupId());
1066+
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY.toString(), describedGroup.groupState());
1067+
assertEquals(2, describedGroup.groupEpoch());
1068+
assertEquals(2, describedGroup.assignmentEpoch());
1069+
1070+
// Verify topology is correctly described
1071+
assertNotNull(describedGroup.topology());
1072+
assertEquals(2, describedGroup.topology().epoch());
1073+
assertEquals(1, describedGroup.topology().subtopologies().size());
1074+
1075+
StreamsGroupDescribeResponseData.Subtopology subtopology = describedGroup.topology().subtopologies().get(0);
1076+
assertEquals("sub-1", subtopology.subtopologyId());
1077+
assertEquals(List.of("input-topic"), subtopology.sourceTopics());
1078+
assertEquals(1, subtopology.repartitionSourceTopics().size());
1079+
assertEquals("repartition-topic", subtopology.repartitionSourceTopics().get(0).name());
1080+
assertEquals(1, subtopology.stateChangelogTopics().size());
1081+
assertEquals("changelog-topic", subtopology.stateChangelogTopics().get(0).name());
1082+
1083+
assertEquals(1, describedGroup.members().size());
1084+
assertEquals("member1", describedGroup.members().get(0).memberId());
1085+
}
1086+
1087+
@Test
1088+
public void testAsDescribedGroupPrefersConfiguredTopologyOverStreamsTopology() {
1089+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
1090+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-configured");
1091+
snapshotRegistry.idempotentCreateSnapshot(0);
1092+
1093+
// Create both StreamsTopology and ConfiguredTopology
1094+
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
1095+
"sub-1", new StreamsGroupTopologyValue.Subtopology()
1096+
.setSubtopologyId("sub-1")
1097+
.setSourceTopics(List.of("streams-topic"))
1098+
);
1099+
1100+
group.setGroupEpoch(3);
1101+
group.setTopology(new StreamsTopology(2, subtopologies));
1102+
group.setConfiguredTopology(new ConfiguredTopology(3, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
1103+
group.setTargetAssignmentEpoch(3);
1104+
snapshotRegistry.idempotentCreateSnapshot(1);
1105+
1106+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
1107+
1108+
// Should prefer ConfiguredTopology over StreamsTopology
1109+
assertNotNull(describedGroup.topology());
1110+
assertEquals(3, describedGroup.topology().epoch()); // ConfiguredTopology epoch
1111+
assertEquals(0, describedGroup.topology().subtopologies().size()); // Empty configured topology
1112+
}
1113+
1114+
@Test
1115+
public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyEmpty() {
1116+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
1117+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-fallback");
1118+
snapshotRegistry.idempotentCreateSnapshot(0);
1119+
1120+
// Create StreamsTopology with subtopologies
1121+
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
1122+
"sub-1", new StreamsGroupTopologyValue.Subtopology()
1123+
.setSubtopologyId("sub-1")
1124+
.setSourceTopics(List.of("fallback-topic"))
1125+
);
1126+
1127+
group.setGroupEpoch(4);
1128+
group.setTopology(new StreamsTopology(4, subtopologies));
1129+
// No ConfiguredTopology set, so should fallback to StreamsTopology
1130+
group.setTargetAssignmentEpoch(4);
1131+
snapshotRegistry.idempotentCreateSnapshot(1);
1132+
1133+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
1134+
1135+
// Should use StreamsTopology when ConfiguredTopology is not available
1136+
assertNotNull(describedGroup.topology());
1137+
assertEquals(4, describedGroup.topology().epoch()); // StreamsTopology epoch
1138+
assertEquals(1, describedGroup.topology().subtopologies().size());
1139+
assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId());
1140+
assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics());
1141+
}
10221142
}

0 commit comments

Comments
 (0)