@@ -9645,19 +9645,43 @@ public void testStreamsGroupDescribeNoErrors() {
9645
9645
.setProcessId("processId")
9646
9646
.setMemberEpoch(epoch)
9647
9647
.setPreviousMemberEpoch(epoch - 1);
9648
+ String subtopology1 = "subtopology1";
9649
+ String fooTopicName = "foo";
9650
+ StreamsTopology topology = new StreamsTopology(
9651
+ 0,
9652
+ Map.of(subtopology1,
9653
+ new StreamsGroupTopologyValue.Subtopology()
9654
+ .setSubtopologyId(subtopology1)
9655
+ .setSourceTopics(List.of(fooTopicName))
9656
+ )
9657
+ );
9648
9658
9649
9659
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
9650
- .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch))
9660
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)
9661
+ .withTopology(topology)
9662
+ )
9651
9663
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch)
9652
- .withMember(memberBuilder.build()))
9664
+ .withMember(memberBuilder.build())
9665
+ .withTopology(topology)
9666
+ )
9653
9667
.build();
9654
9668
9669
+ StreamsGroupDescribeResponseData.Topology expectedTopology =
9670
+ new StreamsGroupDescribeResponseData.Topology()
9671
+ .setEpoch(0)
9672
+ .setSubtopologies(List.of(
9673
+ new StreamsGroupDescribeResponseData.Subtopology()
9674
+ .setSubtopologyId(subtopology1)
9675
+ .setSourceTopics(List.of(fooTopicName))
9676
+ ));
9677
+
9655
9678
List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList(
9656
9679
new StreamsGroupDescribeResponseData.DescribedGroup()
9657
9680
.setGroupEpoch(epoch)
9658
9681
.setGroupId(streamsGroupIds.get(0))
9659
9682
.setGroupState(StreamsGroupState.EMPTY.toString())
9660
- .setAssignmentEpoch(0),
9683
+ .setAssignmentEpoch(0)
9684
+ .setTopology(expectedTopology),
9661
9685
new StreamsGroupDescribeResponseData.DescribedGroup()
9662
9686
.setGroupEpoch(epoch)
9663
9687
.setGroupId(streamsGroupIds.get(1))
@@ -9666,6 +9690,7 @@ public void testStreamsGroupDescribeNoErrors() {
9666
9690
TasksTuple.EMPTY
9667
9691
)
9668
9692
))
9693
+ .setTopology(expectedTopology)
9669
9694
.setGroupState(StreamsGroupState.NOT_READY.toString())
9670
9695
);
9671
9696
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds);
@@ -9695,13 +9720,24 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
9695
9720
String memberId1 = "memberId1";
9696
9721
String memberId2 = "memberId2";
9697
9722
String subtopologyId = "subtopology1";
9723
+ String fooTopicName = "foo";
9724
+ StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
9725
+ .setEpoch(0)
9726
+ .setSubtopologies(
9727
+ List.of(
9728
+ new StreamsGroupTopologyValue.Subtopology()
9729
+ .setSubtopologyId(subtopologyId)
9730
+ .setSourceTopics(List.of(fooTopicName))
9731
+ )
9732
+ );
9698
9733
9699
9734
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
9700
9735
9701
9736
StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
9702
9737
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
9703
9738
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
9704
9739
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
9740
+ context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology));
9705
9741
9706
9742
TasksTuple assignment = new TasksTuple(
9707
9743
Map.of(subtopologyId, Set.of(0, 1)),
@@ -9733,6 +9769,17 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
9733
9769
memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY),
9734
9770
memberBuilder2.build().asStreamsGroupDescribeMember(assignment)
9735
9771
))
9772
+ .setTopology(
9773
+ new StreamsGroupDescribeResponseData.Topology()
9774
+ .setEpoch(0)
9775
+ .setSubtopologies(
9776
+ List.of(
9777
+ new StreamsGroupDescribeResponseData.Subtopology()
9778
+ .setSubtopologyId(subtopologyId)
9779
+ .setSourceTopics(List.of(fooTopicName))
9780
+ )
9781
+ )
9782
+ )
9736
9783
.setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
9737
9784
.setGroupEpoch(epoch + 2);
9738
9785
assertEquals(1, actual.size());
0 commit comments