Skip to content

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Sep 26, 2025

Streams groups sometimes describe as NOT_READY when STABLE. That is, the
group is configured and all topics exist, but when you use LIST_GROUP
and STREAMS_GROUP_DESCRIBE, the group will show up as not ready.

The root cause seems to be that
#19802 moved the creation of the
soft state configured topology from the replay path to the heartbeat.
This way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, may not show the
configured topology, because the configured topology that is created in
the heartbeat is "thrown away", and the new group is recreated on the
replay-path.

To reflect a consistent view of the topology via LIST_GROUP and
STREAMS_GROUP_DESCRIBE, we need to store additional information in the
consumer offset topic. In particular, we need to store at least whether
a topology was validated against the current topic metadata, as this
defines whether a group is in STABLE and not in NOT_READY.

This change adds a new field validatedTopologyEpoch to the metadata of
the group, which stores precisely this information.

* The metadata associated with each subscribed topic name.
* The topology epoch for which the subscribed topics identified by metadataHash are validated.
*/
private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the partition metadata field was not used anymore and wasn't removed in a previous PR

newState = EMPTY;
clearShutdownRequestMemberId();
} else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
} else if (topology().stream().allMatch(x -> x.topologyEpoch() != validatedTopologyEpoch.get())) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state of the group is now completely defined by things persisted on the consumer offset topic, that is, the topology and the validatedTopologyEpoch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor one:
As for me Optional#streams is a bit misleading
It could also be Optional#filter(predicate)#isPresent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my background is functional programming where working with optionals as streams is very natural, but point taken, looks weird for Java.

{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
"about": "The hash of all topics in the group." },
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": -1, "type": "int32",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the field as a tagged field, so if we are compatible without bumping the version.

If a new kafka reads a record without ValidatedTopologyEpoch, it will assume the default -1 and revalidate the topology.

If an old kafka reads a record with ValidatedTopologyEpoch, it will be ignored.

// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0));
records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, group.metadataHash(), group.validatedTopologyEpoch()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a bug here - every time we'd fence a member, we'd reset the metadata hash to 0. We don't need to do that, otherwise we'll need to recompute the metadata hash on the next heartbeat and bump the group epoch again.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses an issue where Streams groups would sometimes show as NOT_READY when they should be STABLE due to inconsistent topology validation state between heartbeat processing and replay paths. The fix introduces a new validatedTopologyEpoch field to track whether a topology has been validated against current metadata.

  • Adds validatedTopologyEpoch field to track topology validation state persistently
  • Updates group state determination logic to use validated topology epoch instead of configured topology readiness
  • Modifies record creation to include the new validation state information

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
StreamsGroupMetadataValue.json Adds validatedTopologyEpoch field to metadata schema
StreamsGroup.java Implements validatedTopologyEpoch tracking and updates state logic
StreamsCoordinatorRecordHelpers.java Updates record helpers to include validation epoch
GroupMetadataManager.java Integrates validation epoch into heartbeat processing
StreamsGroupTest.java Adds test coverage for new validation epoch field
StreamsGroupBuilder.java Updates builder to support validation epoch
StreamsCoordinatorRecordHelpersTest.java Updates tests for renamed methods
GroupMetadataManagerTest.java Extensive test updates for new validation logic
PlaintextAdminIntegrationTest.scala Updates integration tests with new API
IntegrationTestHarness.scala Updates test harness for streams group creation
Comments suppressed due to low confidence (2)

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:1

  • The assignment preparation should use memberId2 but the variable name suggests it should be memberId1 based on the context. The member being processed in this test section appears to be memberId1 based on the surrounding code flow.
/*

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:3271

  • The member epoch should be 12 to match the expected response epoch. This inconsistency could cause test failures or incorrect validation of the heartbeat response.
            .setMemberEpoch(11)

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a expert here, but LGTM. Left cosmetic comments

newState = EMPTY;
clearShutdownRequestMemberId();
} else if (topology().isEmpty() || configuredTopology().isEmpty() || !configuredTopology().get().isReady()) {
} else if (topology().stream().allMatch(x -> x.topologyEpoch() != validatedTopologyEpoch.get())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor one:
As for me Optional#streams is a bit misleading
It could also be Optional#filter(predicate)#isPresent

}

// 3b. If the topology is validated, persist the fact that it is validated.
int validatedTopologyEpoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplification:
Set -1 by default, and conditionally override inside of 'if'
int validatedTopologyEpoch = -1;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

);

StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: full import

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we are doing full imports in this file since htere are multiple MemberStates

context.commit();

List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming is a bit misleading here. Expected member, but described group and just 'actual'. I think it's good to have more consistency here, kind of: expected member, expected described group, actual described group

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Stream group not stable yet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message: "Stream group not stable yet"
Correct me if I'm wrong, but this is final error message (when timeout is reached), so error message should be different

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants