Skip to content

Commit d7a6d31

Browse files
committed
KAFKA-19730: StreamsGroupDescribe result is missing topology when topology not configured
In the streams group heartbeat, we validate the topology set for the group against the topic metadata, to generate the "configured topology" which has a specific number of partitions for each topic. In streams group describe, we use the configured topology to expose this information to the user. However, we leave the topology information as null in the streams group describe response, if the topology is not configured. This triggers a null-pointer exception in the admin client implementation. Instead, we should expose the unconfigured topology when the configured topology is not available, which will still expose useful information.
1 parent 71efb89 commit d7a6d31

File tree

5 files changed

+295
-1
lines changed

5 files changed

+295
-1
lines changed

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4463,6 +4463,47 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
44634463
}
44644464
}
44654465

4466+
@Test
4467+
def testDescribeStreamsGroupsNotReady(): Unit = {
4468+
val streamsGroupId = "stream_group_id"
4469+
val testTopicName = "test_topic"
4470+
4471+
val config = createConfig
4472+
client = Admin.create(config)
4473+
4474+
val streams = createStreamsGroup(
4475+
inputTopic = testTopicName,
4476+
streamsGroupId = streamsGroupId
4477+
)
4478+
streams.poll(JDuration.ofMillis(500L))
4479+
4480+
try {
4481+
TestUtils.waitUntilTrue(() => {
4482+
val firstGroup = client.listGroups().all().get().stream()
4483+
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
4484+
firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId
4485+
}, "Stream group not NOT_READY yet")
4486+
4487+
// Verify the describe call works correctly
4488+
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
4489+
val group = describedGroups.get(streamsGroupId)
4490+
assertNotNull(group)
4491+
assertEquals(streamsGroupId, group.groupId())
4492+
assertFalse(group.members().isEmpty)
4493+
assertNotNull(group.subtopologies())
4494+
assertFalse(group.subtopologies().isEmpty)
4495+
4496+
// Verify the topology contains the expected source and sink topics
4497+
val subtopologies = group.subtopologies().asScala
4498+
assertTrue(subtopologies.exists(subtopology =>
4499+
subtopology.sourceTopics().contains(testTopicName)))
4500+
4501+
} finally {
4502+
Utils.closeQuietly(streams, "streams")
4503+
Utils.closeQuietly(client, "adminClient")
4504+
}
4505+
}
4506+
44664507
@Test
44674508
def testDeleteStreamsGroups(): Unit = {
44684509
val testTopicName = "test_topic"

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,16 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
10391039
.setGroupEpoch(groupEpoch.get(committedOffset))
10401040
.setGroupState(state.get(committedOffset).toString())
10411041
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
1042-
.setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null));
1042+
.setTopology(
1043+
configuredTopology.get(committedOffset)
1044+
.filter(ConfiguredTopology::isReady)
1045+
.map(ConfiguredTopology::asStreamsGroupDescribeTopology)
1046+
.orElse(
1047+
topology.get(committedOffset)
1048+
.map(StreamsTopology::asStreamsGroupDescribeTopology)
1049+
.orElse(null)
1050+
)
1051+
);
10431052
members.entrySet(committedOffset).forEach(
10441053
entry -> describedGroup.members().add(
10451054
entry.getValue().asStreamsGroupDescribeMember(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams;
1818

19+
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
1920
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
2021
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
2122
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
2223
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
2324

2425
import java.util.Collections;
26+
import java.util.Comparator;
2527
import java.util.Map;
2628
import java.util.Objects;
2729
import java.util.Set;
@@ -95,4 +97,42 @@ public static StreamsTopology fromHeartbeatRequest(StreamsGroupHeartbeatRequestD
9597
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
9698
return new StreamsTopology(topology.epoch(), subtopologyMap);
9799
}
100+
101+
public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
102+
return new StreamsGroupDescribeResponseData.Topology()
103+
.setEpoch(topologyEpoch)
104+
.setSubtopologies(
105+
subtopologies.entrySet().stream().map(
106+
entry -> asStreamsGroupDescribeSubtopology(entry.getKey(), entry.getValue())
107+
).toList()
108+
);
109+
}
110+
111+
private StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId, StreamsGroupTopologyValue.Subtopology subtopology) {
112+
return new StreamsGroupDescribeResponseData.Subtopology()
113+
.setSubtopologyId(subtopologyId)
114+
.setSourceTopics(subtopology.sourceTopics().stream().sorted().toList())
115+
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics().stream().sorted().toList())
116+
.setRepartitionSourceTopics(subtopology.repartitionSourceTopics().stream()
117+
.map(this::asStreamsGroupDescribeTopicInfo)
118+
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList())
119+
.setStateChangelogTopics(subtopology.stateChangelogTopics().stream()
120+
.map(this::asStreamsGroupDescribeTopicInfo)
121+
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
122+
}
123+
124+
private StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo(StreamsGroupTopologyValue.TopicInfo topicInfo) {
125+
return new StreamsGroupDescribeResponseData.TopicInfo()
126+
.setName(topicInfo.name())
127+
.setPartitions(topicInfo.partitions())
128+
.setReplicationFactor(topicInfo.replicationFactor())
129+
.setTopicConfigs(
130+
topicInfo.topicConfigs().stream().map(
131+
y -> new StreamsGroupDescribeResponseData.KeyValue()
132+
.setKey(y.key())
133+
.setValue(y.value())
134+
).toList()
135+
);
136+
}
137+
98138
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,4 +1019,124 @@ public void testShutdownRequestedMethods() {
10191019
streamsGroup.removeMember(memberId2);
10201020
assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId());
10211021
}
1022+
1023+
@Test
1024+
public void testAsDescribedGroupWithStreamsTopologyHavingSubtopologies() {
1025+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
1026+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-with-topology");
1027+
snapshotRegistry.idempotentCreateSnapshot(0);
1028+
1029+
// Create a topology with subtopologies
1030+
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
1031+
"sub-1", new StreamsGroupTopologyValue.Subtopology()
1032+
.setSubtopologyId("sub-1")
1033+
.setSourceTopics(List.of("input-topic"))
1034+
.setRepartitionSourceTopics(List.of(
1035+
new StreamsGroupTopologyValue.TopicInfo().setName("repartition-topic")
1036+
))
1037+
.setStateChangelogTopics(List.of(
1038+
new StreamsGroupTopologyValue.TopicInfo().setName("changelog-topic")
1039+
))
1040+
);
1041+
1042+
group.setGroupEpoch(2);
1043+
group.setTopology(new StreamsTopology(2, subtopologies));
1044+
group.setTargetAssignmentEpoch(2);
1045+
group.updateMember(new StreamsGroupMember.Builder("member1")
1046+
.setMemberEpoch(2)
1047+
.setPreviousMemberEpoch(1)
1048+
.setState(MemberState.STABLE)
1049+
.setInstanceId("instance1")
1050+
.setRackId("rack1")
1051+
.setClientId("client1")
1052+
.setClientHost("host1")
1053+
.setRebalanceTimeoutMs(1000)
1054+
.setTopologyEpoch(2)
1055+
.setProcessId("process1")
1056+
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
1057+
.setClientTags(Map.of("tag1", "value1"))
1058+
.setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
1059+
.setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), Map.of()))
1060+
.build());
1061+
snapshotRegistry.idempotentCreateSnapshot(1);
1062+
1063+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
1064+
1065+
assertEquals("group-id-with-topology", describedGroup.groupId());
1066+
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY.toString(), describedGroup.groupState());
1067+
assertEquals(2, describedGroup.groupEpoch());
1068+
assertEquals(2, describedGroup.assignmentEpoch());
1069+
1070+
// Verify topology is correctly described
1071+
assertNotNull(describedGroup.topology());
1072+
assertEquals(2, describedGroup.topology().epoch());
1073+
assertEquals(1, describedGroup.topology().subtopologies().size());
1074+
1075+
StreamsGroupDescribeResponseData.Subtopology subtopology = describedGroup.topology().subtopologies().get(0);
1076+
assertEquals("sub-1", subtopology.subtopologyId());
1077+
assertEquals(List.of("input-topic"), subtopology.sourceTopics());
1078+
assertEquals(1, subtopology.repartitionSourceTopics().size());
1079+
assertEquals("repartition-topic", subtopology.repartitionSourceTopics().get(0).name());
1080+
assertEquals(1, subtopology.stateChangelogTopics().size());
1081+
assertEquals("changelog-topic", subtopology.stateChangelogTopics().get(0).name());
1082+
1083+
assertEquals(1, describedGroup.members().size());
1084+
assertEquals("member1", describedGroup.members().get(0).memberId());
1085+
}
1086+
1087+
@Test
1088+
public void testAsDescribedGroupPrefersConfiguredTopologyOverStreamsTopology() {
1089+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
1090+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-configured");
1091+
snapshotRegistry.idempotentCreateSnapshot(0);
1092+
1093+
// Create both StreamsTopology and ConfiguredTopology
1094+
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
1095+
"sub-1", new StreamsGroupTopologyValue.Subtopology()
1096+
.setSubtopologyId("sub-1")
1097+
.setSourceTopics(List.of("streams-topic"))
1098+
);
1099+
1100+
group.setGroupEpoch(3);
1101+
group.setTopology(new StreamsTopology(2, subtopologies));
1102+
group.setConfiguredTopology(new ConfiguredTopology(3, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
1103+
group.setTargetAssignmentEpoch(3);
1104+
snapshotRegistry.idempotentCreateSnapshot(1);
1105+
1106+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
1107+
1108+
// Should prefer ConfiguredTopology over StreamsTopology
1109+
assertNotNull(describedGroup.topology());
1110+
assertEquals(3, describedGroup.topology().epoch()); // ConfiguredTopology epoch
1111+
assertEquals(0, describedGroup.topology().subtopologies().size()); // Empty configured topology
1112+
}
1113+
1114+
@Test
1115+
public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyEmpty() {
1116+
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
1117+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-fallback");
1118+
snapshotRegistry.idempotentCreateSnapshot(0);
1119+
1120+
// Create StreamsTopology with subtopologies
1121+
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
1122+
"sub-1", new StreamsGroupTopologyValue.Subtopology()
1123+
.setSubtopologyId("sub-1")
1124+
.setSourceTopics(List.of("fallback-topic"))
1125+
);
1126+
1127+
group.setGroupEpoch(4);
1128+
group.setTopology(new StreamsTopology(4, subtopologies));
1129+
// No ConfiguredTopology set, so should fallback to StreamsTopology
1130+
group.setTargetAssignmentEpoch(4);
1131+
snapshotRegistry.idempotentCreateSnapshot(1);
1132+
1133+
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);
1134+
1135+
// Should use StreamsTopology when ConfiguredTopology is not available
1136+
assertNotNull(describedGroup.topology());
1137+
assertEquals(4, describedGroup.topology().epoch()); // StreamsTopology epoch
1138+
assertEquals(1, describedGroup.topology().subtopologies().size());
1139+
assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId());
1140+
assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics());
1141+
}
10221142
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams;
1818

19+
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
1920
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
2021
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
2122
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
@@ -24,6 +25,7 @@
2425
import org.junit.jupiter.api.Test;
2526

2627
import java.util.Arrays;
28+
import java.util.Comparator;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.Set;
@@ -120,6 +122,88 @@ public void fromHeartbeatRequestShouldCreateCorrectTopology() {
120122
assertEquals(mkSubtopology2(), topology.subtopologies().get(SUBTOPOLOGY_ID_2));
121123
}
122124

125+
@Test
126+
public void asStreamsGroupDescribeTopologyShouldReturnCorrectStructure() {
127+
Map<String, Subtopology> subtopologies = mkMap(
128+
mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1()),
129+
mkEntry(SUBTOPOLOGY_ID_2, mkSubtopology2())
130+
);
131+
StreamsTopology topology = new StreamsTopology(1, subtopologies);
132+
133+
StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology();
134+
135+
assertEquals(1, describeTopology.epoch());
136+
assertEquals(2, describeTopology.subtopologies().size());
137+
138+
// Verify subtopologies are correctly converted and sorted
139+
List<StreamsGroupDescribeResponseData.Subtopology> sortedSubtopologies =
140+
describeTopology.subtopologies().stream()
141+
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.Subtopology::subtopologyId))
142+
.toList();
143+
144+
// Verify first subtopology
145+
StreamsGroupDescribeResponseData.Subtopology sub1 = sortedSubtopologies.get(0);
146+
assertEquals(SUBTOPOLOGY_ID_1, sub1.subtopologyId());
147+
// Source topics are sorted alphabetically
148+
assertEquals(List.of(REPARTITION_TOPIC_1, REPARTITION_TOPIC_2, SOURCE_TOPIC_1, SOURCE_TOPIC_2),
149+
sub1.sourceTopics());
150+
assertEquals(List.of(REPARTITION_TOPIC_3), sub1.repartitionSinkTopics());
151+
assertEquals(2, sub1.repartitionSourceTopics().size());
152+
assertEquals(2, sub1.stateChangelogTopics().size());
153+
154+
// Verify second subtopology
155+
StreamsGroupDescribeResponseData.Subtopology sub2 = sortedSubtopologies.get(1);
156+
assertEquals(SUBTOPOLOGY_ID_2, sub2.subtopologyId());
157+
// Source topics are sorted alphabetically
158+
assertEquals(List.of(REPARTITION_TOPIC_3, SOURCE_TOPIC_3), sub2.sourceTopics());
159+
assertEquals(List.of(), sub2.repartitionSinkTopics());
160+
assertEquals(1, sub2.repartitionSourceTopics().size());
161+
assertEquals(1, sub2.stateChangelogTopics().size());
162+
}
163+
164+
@Test
165+
public void asStreamsGroupDescribeTopicInfoShouldConvertCorrectly() {
166+
Map<String, Subtopology> subtopologies = mkMap(
167+
mkEntry(SUBTOPOLOGY_ID_1, mkSubtopology1())
168+
);
169+
StreamsTopology topology = new StreamsTopology(1, subtopologies);
170+
171+
StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology();
172+
StreamsGroupDescribeResponseData.Subtopology describedSub = describeTopology.subtopologies().get(0);
173+
174+
// Verify repartition source topics are correctly converted
175+
List<StreamsGroupDescribeResponseData.TopicInfo> repartitionTopics = describedSub.repartitionSourceTopics();
176+
assertEquals(2, repartitionTopics.size());
177+
178+
// Find the first repartition topic (they should be sorted by name)
179+
StreamsGroupDescribeResponseData.TopicInfo firstTopic = repartitionTopics.stream()
180+
.filter(topic -> topic.name().equals(REPARTITION_TOPIC_1))
181+
.findFirst()
182+
.orElseThrow();
183+
assertEquals(REPARTITION_TOPIC_1, firstTopic.name());
184+
185+
// Verify changelog topics are correctly converted
186+
List<StreamsGroupDescribeResponseData.TopicInfo> changelogTopics = describedSub.stateChangelogTopics();
187+
assertEquals(2, changelogTopics.size());
188+
189+
// Find the first changelog topic (they should be sorted by name)
190+
StreamsGroupDescribeResponseData.TopicInfo firstChangelog = changelogTopics.stream()
191+
.filter(topic -> topic.name().equals(CHANGELOG_TOPIC_1))
192+
.findFirst()
193+
.orElseThrow();
194+
assertEquals(CHANGELOG_TOPIC_1, firstChangelog.name());
195+
}
196+
197+
@Test
198+
public void asStreamsGroupDescribeTopologyWithEmptySubtopologies() {
199+
StreamsTopology topology = new StreamsTopology(0, Map.of());
200+
201+
StreamsGroupDescribeResponseData.Topology describeTopology = topology.asStreamsGroupDescribeTopology();
202+
203+
assertEquals(0, describeTopology.epoch());
204+
assertEquals(0, describeTopology.subtopologies().size());
205+
}
206+
123207
private Subtopology mkSubtopology1() {
124208
return new Subtopology()
125209
.setSubtopologyId(SUBTOPOLOGY_ID_1)

0 commit comments

Comments
 (0)