diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 303e989e9b4fb..eb14858a9440c 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { def createStreamsGroup[K, V](configOverrides: Properties = new Properties, configsToRemove: List[String] = List(), - inputTopic: String, + inputTopics: Set[String], + changelogTopics: Set[String] = Set(), streamsGroupId: String): AsyncKafkaConsumer[K, V] = { val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) @@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { Optional.empty(), util.Map.of( "subtopology-0", new StreamsRebalanceData.Subtopology( - util.Set.of(inputTopic), + inputTopics.asJava, util.Set.of(), util.Map.of(), - util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), + changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava, util.Set.of() )), Map.empty[String, String].asJava @@ -270,7 +271,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { configOverrides = props, streamsRebalanceData = streamsRebalanceData ) - consumer.subscribe(util.Set.of(inputTopic), + consumer.subscribe(inputTopics.asJava, new StreamsRebalanceListener { override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Unit = () override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Unit = () diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 4a686f3d4d871..d6a8ff79157d4 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2318,7 +2318,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - /** * Test the consumer group APIs for member removal. */ @@ -2601,7 +2600,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) val streamsGroup = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId ) @@ -4426,7 +4426,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { prepareRecords(testTopicName) val streams = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId ) streams.poll(JDuration.ofMillis(500L)) @@ -4436,7 +4437,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val firstGroup = client.listGroups().all().get().stream() .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null) firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId - }, "Stream group not stable yet") + }, "Streams group did not transition to STABLE before timeout") // Verify the describe call works correctly val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() @@ -4472,7 +4473,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client = Admin.create(config) val streams = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId ) streams.poll(JDuration.ofMillis(500L)) @@ -4482,7 +4484,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val firstGroup = client.listGroups().all().get().stream() .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null) firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId - }, "Stream group not NOT_READY yet") + }, "Streams group did not transition to NOT_READY before timeout") // Verify the describe call works correctly val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() @@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @Test + def testDescribeStreamsGroupsForStatelessTopology(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testNumPartitions = 1 + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streams = createStreamsGroup( + inputTopics = Set(testTopicName), + streamsGroupId = streamsGroupId + ) + streams.poll(JDuration.ofMillis(500L)) + + try { + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Streams group did not transition to STABLE before timeout") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + // Test describing a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all()) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } + @Test def testDeleteStreamsGroups(): Unit = { val testTopicName = "test_topic" @@ -4526,7 +4577,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val streamsGroupId = s"stream_group_id_$i" val streams = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) streams.poll(JDuration.ofMillis(500L)) @@ -4609,7 +4661,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } val streams = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) @@ -4625,7 +4678,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId - }, "Stream group not stable yet") + }, "Streams group did not transition to STABLE before timeout") val allTopicPartitions = client.listStreamsGroupOffsets( util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec()) @@ -4669,7 +4722,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } val streams = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) @@ -4746,7 +4800,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } val streams = createStreamsGroup( - inputTopic = testTopicName, + inputTopics = Set(testTopicName), + changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, ) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index cea68e09ffba1..e8a6bc8c024e9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -250,9 +250,9 @@ import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord; -import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord; +import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged; @@ -1960,19 +1960,26 @@ private CoordinatorResult stream updatedConfiguredTopology = group.configuredTopology().get(); } + // 3b. If the topology is validated, persist the fact that it is validated. + int validatedTopologyEpoch = -1; if (updatedConfiguredTopology.isReady()) { + validatedTopologyEpoch = updatedTopology.topologyEpoch(); SortedMap subtopologySortedMap = updatedConfiguredTopology.subtopologies().get(); throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedActiveTasks); throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedStandbyTasks); throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedWarmupTasks); } + // We validated a topology that was not validated before, so bump the group epoch as we may have to reassign tasks. + if (validatedTopologyEpoch != group.validatedTopologyEpoch()) { + bumpGroupEpoch = true; + } // Actually bump the group epoch int groupEpoch = group.groupEpoch(); if (bumpGroupEpoch) { groupEpoch += 1; - records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash)); - log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash); + records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch)); + log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch); metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); } @@ -4245,7 +4252,7 @@ private CoordinatorResult streamsGroupFenceMember( // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; - records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0)); + records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, group.metadataHash(), group.validatedTopologyEpoch())); cancelTimers(group.groupId(), member.memberId()); @@ -5365,6 +5372,7 @@ public void replay( StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true); streamsGroup.setGroupEpoch(value.epoch()); streamsGroup.setMetadataHash(value.metadataHash()); + streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch()); } else { StreamsGroup streamsGroup; try { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java index d54f7273eb0c7..4302d65b6c75c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java @@ -96,10 +96,11 @@ public static CoordinatorRecord newStreamsGroupMemberTombstoneRecord( ); } - public static CoordinatorRecord newStreamsGroupEpochRecord( + public static CoordinatorRecord newStreamsGroupMetadataRecord( String groupId, int newGroupEpoch, - long metadataHash + long metadataHash, + int validatedTopologyEpoch ) { Objects.requireNonNull(groupId, "groupId should not be null here"); @@ -109,7 +110,8 @@ public static CoordinatorRecord newStreamsGroupEpochRecord( new ApiMessageAndVersion( new StreamsGroupMetadataValue() .setEpoch(newGroupEpoch) - .setMetadataHash(metadataHash), + .setMetadataHash(metadataHash) + .setValidatedTopologyEpoch(validatedTopologyEpoch), (short) 0 ) ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 61f61d101f196..25a7033743734 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -148,9 +148,9 @@ public static class DeadlineAndEpoch { private final TimelineHashMap staticMembers; /** - * The metadata associated with each subscribed topic name. + * The topology epoch for which the subscribed topics identified by metadataHash are validated. */ - private final TimelineHashMap partitionMetadata; + protected final TimelineInteger validatedTopologyEpoch; /** * The metadata hash which is computed based on the all subscribed topics. @@ -224,7 +224,7 @@ public StreamsGroup( this.groupEpoch = new TimelineInteger(snapshotRegistry); this.members = new TimelineHashMap<>(snapshotRegistry, 0); this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0); - this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry); this.metadataHash = new TimelineLong(snapshotRegistry); this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); @@ -284,7 +284,6 @@ public void setTopology(StreamsTopology topology) { public void setConfiguredTopology(ConfiguredTopology configuredTopology) { this.configuredTopology.set(Optional.ofNullable(configuredTopology)); - maybeUpdateGroupState(); } /** @@ -600,6 +599,23 @@ public void setMetadataHash(long metadataHash) { this.metadataHash.set(metadataHash); } + /** + * @return The validated topology epoch. + */ + public int validatedTopologyEpoch() { + return validatedTopologyEpoch.get(); + } + + /** + * Updates the validated topology epoch. + * + * @param validatedTopologyEpoch The validated topology epoch + */ + public void setValidatedTopologyEpoch(int validatedTopologyEpoch) { + this.validatedTopologyEpoch.set(validatedTopologyEpoch); + maybeUpdateGroupState(); + } + /** * Computes the metadata hash based on the current topology and the current metadata image. * @@ -837,7 +853,7 @@ private void maybeUpdateGroupState() { if (members.isEmpty()) { newState = EMPTY; clearShutdownRequestMemberId(); - } else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) { + } else if (topology().filter(x -> x.topologyEpoch() == validatedTopologyEpoch.get()).isEmpty()) { newState = NOT_READY; } else if (groupEpoch.get() > targetAssignmentEpoch.get()) { newState = ASSIGNING; diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json index 0d06b0c7f49b4..b66f8181af9fe 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json @@ -24,6 +24,8 @@ { "name": "Epoch", "versions": "0+", "type": "int32", "about": "The group epoch." }, { "name": "MetadataHash", "versions": "0+", "type": "int64", - "about": "The hash of all topics in the group." } + "about": "The hash of all topics in the group." }, + { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": -1, "type": "int32", + "about": "The topology epoch whose topics where validated to be present in a valid configuration in the metadata." } ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 957ae7e814715..9533791e0dfe3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -139,7 +139,6 @@ import org.apache.kafka.coordinator.group.streams.TasksTuple; import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException; -import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; @@ -4434,23 +4433,27 @@ public void testUpdateStreamsGroupSizeCounter() { .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(1), 10) // Stable group .withTargetAssignmentEpoch(10) .withTopology(new StreamsTopology(1, Map.of())) + .withValidatedTopologyEpoch(1) .withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(0)) .setMemberEpoch(10) .build())) .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(2), 10) // Assigning group .withTargetAssignmentEpoch(9) .withTopology(new StreamsTopology(1, Map.of())) + .withValidatedTopologyEpoch(1) .withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(1)) .setMemberEpoch(9) .build())) .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(3), 10) // Reconciling group .withTargetAssignmentEpoch(10) .withTopology(new StreamsTopology(1, Map.of())) + .withValidatedTopologyEpoch(1) .withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(2)) .setMemberEpoch(9) .build())) .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(4), 10) // NotReady group .withTargetAssignmentEpoch(10) + .withTopology(new StreamsTopology(1, Map.of())) .withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(3)) .build())) .build(); @@ -9736,7 +9739,7 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() { StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build())); - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId, epoch + 1, 0, -1)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology)); TasksTuple assignment = new TasksTuple( @@ -9749,7 +9752,7 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() { context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build())); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId, memberId2, assignment)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build())); - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId, epoch + 2, 0, -1)); List actual = context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), context.lastCommittedOffset); StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() @@ -16035,7 +16038,7 @@ public void testStreamsGroupMemberEpochValidation() { context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member)); - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 100, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 100, 0, 0)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); @@ -16283,7 +16286,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, groupMetadataHash), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, groupMetadataHash, 0), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), @@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testJoinEmptyStreamsGroupAndDescribe() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .buildCoordinatorMetadataImage(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.streamsGroup(groupId)); + + CoordinatorResult result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build(); + + // Commit the offset and test again + context.commit(); + + List actualDescribedGroups = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset); + StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setAssignmentEpoch(1) + .setTopology( + new StreamsGroupDescribeResponseData.Topology() + .setEpoch(0) + .setSubtopologies(List.of( + new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + )) + ) + .setMembers(Collections.singletonList( + expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + )) + .setGroupState(StreamsGroupState.STABLE.toString()) + .setGroupEpoch(1); + assertEquals(1, actualDescribedGroups.size()); + assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0)); + } + @Test public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { String groupId = "fooup"; @@ -16364,9 +16459,9 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) - ))), + )), -1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) @@ -16450,14 +16545,15 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) - ))), + )), -1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); + assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); assertRecordsEquals(expectedRecords, result.records()); } @@ -16532,15 +16628,16 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) - ))), + )), -1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); + assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); assertRecordsEquals(expectedRecords, result.records()); } @@ -16577,6 +16674,7 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() { .withStreamsGroup( new StreamsGroupBuilder(groupId, 10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology1)) + .withValidatedTopologyEpoch(1) ) .build(); @@ -16627,10 +16725,10 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) - ))), + )), 1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) @@ -16682,6 +16780,7 @@ public void testStreamsGroupMemberRequestingShutdownApplication() { .withTargetAssignmentEpoch(10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) .withMetadataHash(groupMetadataHash) + .withValidatedTopologyEpoch(0) ) .build(); @@ -16751,6 +16850,8 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { .buildCoordinatorMetadataImage(); MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + long metadataHash = computeGroupHash(Map.of(fooTopicName, computeTopicHash(fooTopicName, metadataImage))); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) @@ -16767,7 +16868,8 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { .build()) .withTargetAssignmentEpoch(10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) - .withMetadataHash(computeGroupHash(Map.of(fooTopicName, computeTopicHash(fooTopicName, metadataImage)))) + .withValidatedTopologyEpoch(0) + .withMetadataHash(metadataHash) ) .build(); @@ -16792,7 +16894,7 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, metadataHash, 0) ), result1.records() ); @@ -16801,7 +16903,7 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { context.replay(record); } assignor.prepareGroupAssignment( - Map.of(memberId1, TasksTuple.EMPTY) + Map.of(memberId2, TasksTuple.EMPTY) ); CoordinatorResult result2 = context.streamsGroupHeartbeat( @@ -16814,7 +16916,7 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(12) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setStatus(List.of( new StreamsGroupHeartbeatResponseData.Status() @@ -16916,7 +17018,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, groupMetadataHash, 0), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), @@ -17022,10 +17124,10 @@ barTopicName, computeTopicHash(barTopicName, oldMetadataImage) .build(); List expectedRecords = List.of( - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, newMetadataImage), barTopicName, computeTopicHash(barTopicName, newMetadataImage) - ))), + )), 0), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), @@ -17206,7 +17308,7 @@ public void testStreamsLeavingMemberRemovesMemberAndBumpsGroupEpoch() { StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId2), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId2), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0, -1) ); assertRecordsEquals(expectedRecords, result.records()); @@ -17334,6 +17436,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) .withTargetAssignmentEpoch(10) .withMetadataHash(groupMetadataHash) + .withValidatedTopologyEpoch(0) ) .build(); @@ -17776,16 +17879,12 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, streamsGroupMemberBuilderWithDefaults(memberId1) .build())); - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, groupMetadataHash)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, groupMetadataHash, -1)); assertEquals(StreamsGroupState.NOT_READY, context.streamsGroupState(groupId)); context.groupMetadataManager.getStreamsGroupOrThrow(groupId) - .setConfiguredTopology(InternalTopicManager.configureTopics( - new LogContext(), - groupMetadataHash, - StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)), - metadataImage)); + .setValidatedTopologyEpoch(0); assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, context.streamsGroupState(groupId)); @@ -17942,9 +18041,9 @@ fooTopicName, computeTopicHash( .build(); List expectedRecords = List.of( - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) - ))), + )), 0), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) @@ -18064,9 +18163,9 @@ fooTopicName, computeTopicHash( .build(); List expectedRecords = List.of( - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) - ))), + )), 0), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) @@ -18169,13 +18268,17 @@ public void testStreamsSessionTimeoutExpiration() { Topology topology = new Topology().setSubtopologies(List.of( new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) )); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .buildCoordinatorMetadataImage(); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )); MockTaskAssignor assignor = new MockTaskAssignor("sticky"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .buildCoordinatorMetadataImage()) + .withMetadataImage(metadataImage) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18211,7 +18314,7 @@ public void testStreamsSessionTimeoutExpiration() { StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, groupMetadataHash, 0) ) ) )), @@ -18393,13 +18496,17 @@ public void testStreamsRebalanceTimeoutExpiration() { Topology topology = new Topology().setSubtopologies(List.of( new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) )); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .buildCoordinatorMetadataImage(); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )); MockTaskAssignor assignor = new MockTaskAssignor("sticky"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 3) - .buildCoordinatorMetadataImage()) + .withMetadataImage(metadataImage) .build(); assignor.prepareGroupAssignment( @@ -18511,7 +18618,7 @@ public void testStreamsRebalanceTimeoutExpiration() { StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, groupMetadataHash, 0) ) ) )), @@ -18799,7 +18906,7 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1, 0), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 1, 0, -1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId, memberId, TasksTuple.EMPTY), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 1), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId, expectedMember) @@ -19383,7 +19490,7 @@ public void testReplayStreamsGroupMemberMetadataTombstoneNotExisting() { // The group still exists but the member is already gone. Replaying the // StreamsGroupMemberMetadata tombstone should be a no-op. - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo", "m1")); assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1")); @@ -19439,7 +19546,7 @@ public void testReplayStreamsGroupMetadata() { .build(); // The group is created if it does not exist. - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0)); assertEquals(10, context.groupMetadataManager.streamsGroup("foo").groupEpoch()); } @@ -19631,7 +19738,7 @@ public void testReplayStreamsGroupCurrentMemberAssignmentTombstoneNotExisting() // The group still exists, but the member is already gone. Replaying the // StreamsGroupCurrentMemberAssignment tombstone should be a no-op. - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo", "m1")); assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1")); @@ -19707,7 +19814,7 @@ public void testReplayStreamsGroupTopologyTombstoneNotExists() { // The group still exists, but the member is already gone. Replaying the // StreamsGroupTopology tombstone should be a no-op. - context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo", 10, 0)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo", 10, 0, 0)); context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo")); assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java index 2485cb65e6fa3..a93a276ef268d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java @@ -250,19 +250,20 @@ public void testNewStreamsGroupMemberTombstoneRecord() { } @Test - public void testNewStreamsGroupEpochRecord() { + public void testNewStreamsGroupMetadataRecord() { CoordinatorRecord expectedRecord = CoordinatorRecord.record( new StreamsGroupMetadataKey() .setGroupId(GROUP_ID), new ApiMessageAndVersion( new StreamsGroupMetadataValue() .setEpoch(42) - .setMetadataHash(42), + .setMetadataHash(42) + .setValidatedTopologyEpoch(43), (short) 0 ) ); - assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42)); + assertEquals(expectedRecord, StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 42, 43)); } @Test @@ -676,7 +677,7 @@ public void testNewStreamsGroupMemberTombstoneRecordNullMemberId() { @Test public void testNewStreamsGroupEpochRecordNullGroupId() { NullPointerException exception = assertThrows(NullPointerException.class, () -> - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 1, 1)); + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1)); assertEquals("groupId should not be null here", exception.getMessage()); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java index 5d291d9884d3a..b7ffdd82def4c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java @@ -35,6 +35,7 @@ public class StreamsGroupBuilder { private final Map members = new HashMap<>(); private final Map targetAssignments = new HashMap<>(); private long metadataHash = 0L; + private int validatedTopologyEpoch = -1; public StreamsGroupBuilder(String groupId, int groupEpoch) { this.groupId = groupId; @@ -53,6 +54,11 @@ public StreamsGroupBuilder withMetadataHash(long metadataHash) { return this; } + public StreamsGroupBuilder withValidatedTopologyEpoch(int validatedTopologyEpoch) { + this.validatedTopologyEpoch = validatedTopologyEpoch; + return this; + } + public StreamsGroupBuilder withTopology(StreamsTopology streamsTopology) { this.topology = streamsTopology; return this; @@ -79,7 +85,7 @@ public List build() { // Add group epoch record. records.add( - StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, metadataHash)); + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch)); // Add target assignment records. targetAssignments.forEach((memberId, assignment) -> diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index ba24abd2b80d6..94f780064264a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -498,6 +498,7 @@ public void testGroupState() { streamsGroup.setTopology(new StreamsTopology(1, Map.of())); streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty())); + streamsGroup.setValidatedTopologyEpoch(1); assertEquals(MemberState.STABLE, member1.state()); assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, streamsGroup.state()); @@ -694,6 +695,7 @@ public void testAsListedGroup() { ); group.setGroupEpoch(1); group.setTopology(new StreamsTopology(1, Map.of())); + group.setValidatedTopologyEpoch(1); group.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty())); group.setTargetAssignmentEpoch(1); group.updateMember(new StreamsGroupMember.Builder("member1") @@ -760,6 +762,7 @@ public void testValidateDeleteGroup() { streamsGroup.setTopology(new StreamsTopology(1, Map.of())); streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty())); + streamsGroup.setValidatedTopologyEpoch(1); assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, streamsGroup.state()); assertThrows(GroupNotEmptyException.class, streamsGroup::validateDeleteGroup); @@ -805,6 +808,7 @@ public void testAsDescribedGroup() { group.setGroupEpoch(1); group.setTopology(new StreamsTopology(1, Map.of())); group.setConfiguredTopology(new ConfiguredTopology(1, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty())); + group.setValidatedTopologyEpoch(1); group.setTargetAssignmentEpoch(1); group.updateMember(new StreamsGroupMember.Builder("member1") .setMemberEpoch(1)