Skip to content

Commit eaed6cf

Browse files
committed
impl
1 parent a2c1c5b commit eaed6cf

File tree

10 files changed

+278
-74
lines changed

10 files changed

+278
-74
lines changed

core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
239239

240240
def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
241241
configsToRemove: List[String] = List(),
242-
inputTopic: String,
242+
inputTopics: Set[String],
243+
changelogTopics: Set[String] = Set(),
243244
streamsGroupId: String): AsyncKafkaConsumer[K, V] = {
244245
val props = new Properties()
245246
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
@@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
255256
Optional.empty(),
256257
util.Map.of(
257258
"subtopology-0", new StreamsRebalanceData.Subtopology(
258-
util.Set.of(inputTopic),
259+
inputTopics.asJava,
259260
util.Set.of(),
260261
util.Map.of(),
261-
util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())),
262+
changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava,
262263
util.Set.of()
263264
)),
264265
Map.empty[String, String].asJava
@@ -270,7 +271,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
270271
configOverrides = props,
271272
streamsRebalanceData = streamsRebalanceData
272273
)
273-
consumer.subscribe(util.Set.of(inputTopic),
274+
consumer.subscribe(inputTopics.asJava,
274275
new StreamsRebalanceListener {
275276
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
276277
Optional.empty()

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,7 +2318,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
23182318
}
23192319
}
23202320

2321-
23222321
/**
23232322
* Test the consumer group APIs for member removal.
23242323
*/
@@ -2601,7 +2600,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
26012600
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
26022601

26032602
val streamsGroup = createStreamsGroup(
2604-
inputTopic = testTopicName,
2603+
inputTopics = Set(testTopicName),
2604+
changelogTopics = Set(testTopicName + "-changelog"),
26052605
streamsGroupId = streamsGroupId
26062606
)
26072607

@@ -4426,7 +4426,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
44264426
prepareRecords(testTopicName)
44274427

44284428
val streams = createStreamsGroup(
4429-
inputTopic = testTopicName,
4429+
inputTopics = Set(testTopicName),
4430+
changelogTopics = Set(testTopicName + "-changelog"),
44304431
streamsGroupId = streamsGroupId
44314432
)
44324433
streams.poll(JDuration.ofMillis(500L))
@@ -4472,7 +4473,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
44724473
client = Admin.create(config)
44734474

44744475
val streams = createStreamsGroup(
4475-
inputTopic = testTopicName,
4476+
inputTopics = Set(testTopicName),
4477+
changelogTopics = Set(testTopicName + "-changelog"),
44764478
streamsGroupId = streamsGroupId
44774479
)
44784480
streams.poll(JDuration.ofMillis(500L))
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
45044506
}
45054507
}
45064508

4509+
@Test
4510+
def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
4511+
val streamsGroupId = "stream_group_id"
4512+
val testTopicName = "test_topic"
4513+
val testNumPartitions = 1
4514+
4515+
val config = createConfig
4516+
client = Admin.create(config)
4517+
4518+
prepareTopics(List(testTopicName), testNumPartitions)
4519+
prepareRecords(testTopicName)
4520+
4521+
val streams = createStreamsGroup(
4522+
inputTopics = Set(testTopicName),
4523+
streamsGroupId = streamsGroupId
4524+
)
4525+
streams.poll(JDuration.ofMillis(500L))
4526+
4527+
try {
4528+
TestUtils.waitUntilTrue(() => {
4529+
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
4530+
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
4531+
}, "Stream group not stable yet")
4532+
4533+
// Verify the describe call works correctly
4534+
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
4535+
val group = describedGroups.get(streamsGroupId)
4536+
assertNotNull(group)
4537+
assertEquals(streamsGroupId, group.groupId())
4538+
assertFalse(group.members().isEmpty)
4539+
assertNotNull(group.subtopologies())
4540+
assertFalse(group.subtopologies().isEmpty)
4541+
4542+
// Verify the topology contains the expected source and sink topics
4543+
val subtopologies = group.subtopologies().asScala
4544+
assertTrue(subtopologies.exists(subtopology =>
4545+
subtopology.sourceTopics().contains(testTopicName)))
4546+
4547+
// Test describing a non-existing group
4548+
val nonExistingGroup = "non_existing_stream_group"
4549+
val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup))
4550+
assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all())
4551+
4552+
} finally {
4553+
Utils.closeQuietly(streams, "streams")
4554+
Utils.closeQuietly(client, "adminClient")
4555+
}
4556+
}
4557+
45074558
@Test
45084559
def testDeleteStreamsGroups(): Unit = {
45094560
val testTopicName = "test_topic"
@@ -4526,7 +4577,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
45264577
val streamsGroupId = s"stream_group_id_$i"
45274578

45284579
val streams = createStreamsGroup(
4529-
inputTopic = testTopicName,
4580+
inputTopics = Set(testTopicName),
4581+
changelogTopics = Set(testTopicName + "-changelog"),
45304582
streamsGroupId = streamsGroupId,
45314583
)
45324584
streams.poll(JDuration.ofMillis(500L))
@@ -4609,7 +4661,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
46094661
}
46104662

46114663
val streams = createStreamsGroup(
4612-
inputTopic = testTopicName,
4664+
inputTopics = Set(testTopicName),
4665+
changelogTopics = Set(testTopicName + "-changelog"),
46134666
streamsGroupId = streamsGroupId,
46144667
)
46154668

@@ -4669,7 +4722,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
46694722
}
46704723

46714724
val streams = createStreamsGroup(
4672-
inputTopic = testTopicName,
4725+
inputTopics = Set(testTopicName),
4726+
changelogTopics = Set(testTopicName + "-changelog"),
46734727
streamsGroupId = streamsGroupId,
46744728
)
46754729

@@ -4746,7 +4800,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
47464800
}
47474801

47484802
val streams = createStreamsGroup(
4749-
inputTopic = testTopicName,
4803+
inputTopics = Set(testTopicName),
4804+
changelogTopics = Set(testTopicName + "-changelog"),
47504805
streamsGroupId = streamsGroupId,
47514806
)
47524807

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,9 @@
250250
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
251251
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
252252
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
253-
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
254253
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
255254
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
255+
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord;
256256
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
257257
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
258258
import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -1954,19 +1954,28 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
19541954
updatedConfiguredTopology = group.configuredTopology().get();
19551955
}
19561956

1957+
// 3b. If the topology is validated, persist the fact that it is validated.
1958+
int validatedTopologyEpoch;
19571959
if (updatedConfiguredTopology.isReady()) {
1960+
validatedTopologyEpoch = updatedTopology.topologyEpoch();
19581961
SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = updatedConfiguredTopology.subtopologies().get();
19591962
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedActiveTasks);
19601963
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedStandbyTasks);
19611964
throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedWarmupTasks);
1965+
} else {
1966+
validatedTopologyEpoch = -1;
1967+
}
1968+
// We validated a topology that was not validated before, so bump the group epoch as we may have to reassign tasks.
1969+
if (validatedTopologyEpoch != group.validatedTopologyEpoch()) {
1970+
bumpGroupEpoch = true;
19621971
}
19631972

19641973
// Actually bump the group epoch
19651974
int groupEpoch = group.groupEpoch();
19661975
if (bumpGroupEpoch) {
19671976
groupEpoch += 1;
1968-
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash));
1969-
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
1977+
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch));
1978+
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch);
19701979
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
19711980
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
19721981
}
@@ -4223,7 +4232,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember(
42234232

42244233
// We bump the group epoch.
42254234
int groupEpoch = group.groupEpoch() + 1;
4226-
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0));
4235+
records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, group.metadataHash(), group.validatedTopologyEpoch()));
42274236

42284237
cancelTimers(group.groupId(), member.memberId());
42294238

@@ -5343,6 +5352,7 @@ public void replay(
53435352
StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true);
53445353
streamsGroup.setGroupEpoch(value.epoch());
53455354
streamsGroup.setMetadataHash(value.metadataHash());
5355+
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
53465356
} else {
53475357
StreamsGroup streamsGroup;
53485358
try {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,11 @@ public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord(
9696
);
9797
}
9898

99-
public static CoordinatorRecord newStreamsGroupEpochRecord(
99+
public static CoordinatorRecord newStreamsGroupMetadataRecord(
100100
String groupId,
101101
int newGroupEpoch,
102-
long metadataHash
102+
long metadataHash,
103+
int validatedTopologyEpoch
103104
) {
104105
Objects.requireNonNull(groupId, "groupId should not be null here");
105106

@@ -109,7 +110,8 @@ public static CoordinatorRecord newStreamsGroupEpochRecord(
109110
new ApiMessageAndVersion(
110111
new StreamsGroupMetadataValue()
111112
.setEpoch(newGroupEpoch)
112-
.setMetadataHash(metadataHash),
113+
.setMetadataHash(metadataHash)
114+
.setValidatedTopologyEpoch(validatedTopologyEpoch),
113115
(short) 0
114116
)
115117
);

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,9 @@ public static class DeadlineAndEpoch {
148148
private final TimelineHashMap<String, String> staticMembers;
149149

150150
/**
151-
* The metadata associated with each subscribed topic name.
151+
* The topology epoch for which the subscribed topics identified by metadataHash are validated.
152152
*/
153-
private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
153+
protected final TimelineInteger validatedTopologyEpoch;
154154

155155
/**
156156
* The metadata hash which is computed based on the all subscribed topics.
@@ -224,7 +224,7 @@ public StreamsGroup(
224224
this.groupEpoch = new TimelineInteger(snapshotRegistry);
225225
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
226226
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
227-
this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
227+
this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
228228
this.metadataHash = new TimelineLong(snapshotRegistry);
229229
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
230230
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -284,7 +284,6 @@ public void setTopology(StreamsTopology topology) {
284284

285285
public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
286286
this.configuredTopology.set(Optional.ofNullable(configuredTopology));
287-
maybeUpdateGroupState();
288287
}
289288

290289
/**
@@ -600,6 +599,23 @@ public void setMetadataHash(long metadataHash) {
600599
this.metadataHash.set(metadataHash);
601600
}
602601

602+
/**
603+
* @return The validated topology epoch.
604+
*/
605+
public int validatedTopologyEpoch() {
606+
return validatedTopologyEpoch.get();
607+
}
608+
609+
/**
610+
* Updates the validated topology epoch.
611+
*
612+
* @param validatedTopologyEpoch The validated topology epoch
613+
*/
614+
public void setValidatedTopologyEpoch(int validatedTopologyEpoch) {
615+
this.validatedTopologyEpoch.set(validatedTopologyEpoch);
616+
maybeUpdateGroupState();
617+
}
618+
603619
/**
604620
* Computes the metadata hash based on the current topology and the current metadata image.
605621
*
@@ -837,7 +853,7 @@ private void maybeUpdateGroupState() {
837853
if (members.isEmpty()) {
838854
newState = EMPTY;
839855
clearShutdownRequestMemberId();
840-
} else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
856+
} else if (topology().stream().allMatch(x -> x.topologyEpoch() != validatedTopologyEpoch.get())) {
841857
newState = NOT_READY;
842858
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
843859
newState = ASSIGNING;

group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
{ "name": "Epoch", "versions": "0+", "type": "int32",
2525
"about": "The group epoch." },
2626
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
27-
"about": "The hash of all topics in the group." }
27+
"about": "The hash of all topics in the group." },
28+
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": -1, "type": "int32",
29+
"about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." }
2830
]
2931
}

0 commit comments

Comments
 (0)