Skip to content

Commit 2a05c0c

Browse files
authored
KAFKA-19001: Use streams group-level configurations in heartbeat (apache#19219)
Implements the use of session timeout, standby tasks and heartbeat interval configurations in the streams group heartbeat. Piggy-backed is another test that streams groups react to changes in the topic metadata. Reviewers: Bill Bejeck <[email protected]>, Bruno Cadonna <[email protected]>, Matthias J. Sax <[email protected]>
1 parent 2d02f1d commit 2a05c0c

File tree

3 files changed

+280
-5
lines changed

3 files changed

+280
-5
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1814,6 +1814,25 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
18141814
}
18151815
}
18161816

1817+
/**
1818+
* Checks whether the streams group can accept a new member or not based on the
1819+
* max group size defined.
1820+
*
1821+
* @param group The streams group.
1822+
*
1823+
* @throws GroupMaxSizeReachedException if the maximum capacity has been reached.
1824+
*/
1825+
private void throwIfStreamsGroupIsFull(
1826+
StreamsGroup group
1827+
) throws GroupMaxSizeReachedException {
1828+
// If the streams group has reached its maximum capacity, the member is rejected if it is not
1829+
// already a member of the streams group.
1830+
if (group.numMembers() >= config.streamsGroupMaxSize()) {
1831+
throw new GroupMaxSizeReachedException("The streams group has reached its maximum capacity of "
1832+
+ config.streamsGroupMaxSize() + " members.");
1833+
}
1834+
}
1835+
18171836
/**
18181837
* Validates the member epoch provided in the heartbeat request.
18191838
*
@@ -2080,7 +2099,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
20802099

20812100
// Get or create the streams group.
20822101
boolean isJoining = memberEpoch == 0;
2083-
final StreamsGroup group = isJoining ? getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId);
2102+
StreamsGroup group;
2103+
if (isJoining) {
2104+
group = getOrCreateStreamsGroup(groupId);
2105+
throwIfStreamsGroupIsFull(group);
2106+
} else {
2107+
group = getStreamsGroupOrThrow(groupId);
2108+
}
20842109

20852110
// Get or create the member.
20862111
StreamsGroupMember member;
@@ -8289,14 +8314,18 @@ private int shareGroupHeartbeatIntervalMs(String groupId) {
82898314
* Get the session timeout of the provided streams group.
82908315
*/
82918316
private int streamsGroupSessionTimeoutMs(String groupId) {
8292-
return 45000;
8317+
Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId);
8318+
return groupConfig.map(GroupConfig::streamsSessionTimeoutMs)
8319+
.orElse(config.streamsGroupSessionTimeoutMs());
82938320
}
82948321

82958322
/**
82968323
* Get the heartbeat interval of the provided streams group.
82978324
*/
82988325
private int streamsGroupHeartbeatIntervalMs(String groupId) {
8299-
return 5000;
8326+
Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId);
8327+
return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs)
8328+
.orElse(config.streamsGroupHeartbeatIntervalMs());
83008329
}
83018330

83028331
/**
@@ -8310,7 +8339,10 @@ private TaskAssignor streamsGroupAssignor(String groupId) {
83108339
* Get the assignor of the provided streams group.
83118340
*/
83128341
private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
8313-
return Map.of("group.streams.num.standby.replicas", "0");
8342+
Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId);
8343+
final Integer numStandbyReplicas = groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
8344+
.orElse(config.streamsGroupNumStandbyReplicas());
8345+
return Map.of("num.standby.replicas", numStandbyReplicas.toString());
83148346
}
83158347

83168348
/**

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 238 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@
175175
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
176176
import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG;
177177
import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG;
178+
import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
179+
import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
180+
import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
178181
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
179182
import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
180183
import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
@@ -4070,7 +4073,7 @@ private StreamsGroupMember.Builder streamsGroupMemberBuilderWithDefaults(String
40704073
.setProcessId(DEFAULT_PROCESS_ID)
40714074
.setUserEndpoint(null);
40724075
}
4073-
4076+
40744077
@Test
40754078
public void testGenerateRecordsOnNewClassicGroup() throws Exception {
40764079
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@@ -15799,6 +15802,48 @@ public void testStreamsGroupMemberEpochValidation() {
1579915802
assertEquals(100, result.response().data().memberEpoch());
1580015803
}
1580115804

15805+
@Test
15806+
public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
15807+
String groupId = "fooup";
15808+
String memberId1 = Uuid.randomUuid().toString();
15809+
String memberId2 = Uuid.randomUuid().toString();
15810+
String memberId3 = Uuid.randomUuid().toString();
15811+
Topology topology = new Topology().setSubtopologies(List.of());
15812+
15813+
// Create a context with one streams group containing two members.
15814+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
15815+
.withMetadataImage(new MetadataImageBuilder().build())
15816+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2)
15817+
.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
15818+
.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
15819+
.setMemberEpoch(10)
15820+
.setPreviousMemberEpoch(9)
15821+
.build())
15822+
.withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
15823+
.setMemberEpoch(10)
15824+
.setPreviousMemberEpoch(9)
15825+
.build())
15826+
.withTargetAssignmentEpoch(10)
15827+
.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
15828+
.withPartitionMetadata(Map.of())
15829+
)
15830+
.build();
15831+
15832+
assertThrows(GroupMaxSizeReachedException.class, () ->
15833+
context.streamsGroupHeartbeat(
15834+
new StreamsGroupHeartbeatRequestData()
15835+
.setGroupId(groupId)
15836+
.setMemberId(memberId3)
15837+
.setMemberEpoch(0)
15838+
.setProcessId("process-id")
15839+
.setRebalanceTimeoutMs(1500)
15840+
.setTopology(topology)
15841+
.setActiveTasks(List.of())
15842+
.setStandbyTasks(List.of())
15843+
.setWarmupTasks(List.of())
15844+
));
15845+
}
15846+
1580215847
@Test
1580315848
public void testMemberJoinsEmptyStreamsGroup() {
1580415849
String groupId = "fooup";
@@ -17871,6 +17916,104 @@ public void testStreamsRebalanceTimeoutExpiration() {
1787117916
context.assertNoRebalanceTimeout(groupId, memberId1);
1787217917
}
1787317918

17919+
@Test
17920+
public void testStreamsOnNewMetadataImage() {
17921+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
17922+
17923+
// Topology of group 1 uses a and b.
17924+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group1",
17925+
new Topology().setSubtopologies(List.of(
17926+
new Subtopology().setSubtopologyId("subtopology1")
17927+
.setSourceTopics(List.of("a"))
17928+
.setRepartitionSourceTopics(List.of(new TopicInfo().setName("b"))
17929+
))
17930+
)));
17931+
17932+
// Topology of group 2 uses b and c.
17933+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group2",
17934+
new Topology().setSubtopologies(List.of(
17935+
new Subtopology().setSubtopologyId("subtopology2")
17936+
.setSourceTopics(List.of("b"))
17937+
.setStateChangelogTopics(List.of(new TopicInfo().setName("c")))
17938+
))
17939+
));
17940+
17941+
// Topology of group 3 uses d.
17942+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group3",
17943+
new Topology().setSubtopologies(List.of(
17944+
new Subtopology().setSubtopologyId("subtopology3")
17945+
.setSourceTopics(List.of("d"))
17946+
))
17947+
));
17948+
17949+
// Topology of group 4 subscribes to e.
17950+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group4",
17951+
new Topology().setSubtopologies(List.of(
17952+
new Subtopology().setSubtopologyId("subtopology4")
17953+
.setSourceTopics(List.of("e"))
17954+
))
17955+
));
17956+
17957+
// Topology of group 5 subscribes to f.
17958+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group5",
17959+
new Topology().setSubtopologies(List.of(
17960+
new Subtopology().setSubtopologyId("subtopology5")
17961+
.setSourceTopics(List.of("f"))
17962+
))
17963+
));
17964+
17965+
// Ensures that all refresh flags are set to the future.
17966+
List.of("group1", "group2", "group3", "group4", "group5").forEach(groupId -> {
17967+
StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId);
17968+
group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0);
17969+
assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
17970+
});
17971+
17972+
// Update the metadata image.
17973+
Uuid topicA = Uuid.randomUuid();
17974+
Uuid topicB = Uuid.randomUuid();
17975+
Uuid topicC = Uuid.randomUuid();
17976+
Uuid topicD = Uuid.randomUuid();
17977+
Uuid topicE = Uuid.randomUuid();
17978+
17979+
// Create a first base image with topic a, b, c and d.
17980+
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
17981+
delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
17982+
delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0));
17983+
delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
17984+
delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(0));
17985+
delta.replay(new TopicRecord().setTopicId(topicC).setName("c"));
17986+
delta.replay(new PartitionRecord().setTopicId(topicC).setPartitionId(0));
17987+
delta.replay(new TopicRecord().setTopicId(topicD).setName("d"));
17988+
delta.replay(new PartitionRecord().setTopicId(topicD).setPartitionId(0));
17989+
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
17990+
17991+
// Create a delta which updates topic B, deletes topic D and creates topic E.
17992+
delta = new MetadataDelta(image);
17993+
delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2));
17994+
delta.replay(new RemoveTopicRecord().setTopicId(topicD));
17995+
delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
17996+
delta.replay(new PartitionRecord().setTopicId(topicE).setPartitionId(1));
17997+
image = delta.apply(MetadataProvenance.EMPTY);
17998+
17999+
// Update metadata image with the delta.
18000+
context.groupMetadataManager.onNewMetadataImage(image, delta);
18001+
18002+
// Verify the groups.
18003+
List.of("group1", "group2", "group3", "group4").forEach(groupId -> {
18004+
StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId);
18005+
assertTrue(group.hasMetadataExpired(context.time.milliseconds()), groupId);
18006+
});
18007+
18008+
List.of("group5").forEach(groupId -> {
18009+
StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId);
18010+
assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
18011+
});
18012+
18013+
// Verify image.
18014+
assertEquals(image, context.groupMetadataManager.image());
18015+
}
18016+
1787418017
@Test
1787518018
public void testConsumerGroupDynamicConfigs() {
1787618019
String groupId = "fooup";
@@ -18093,6 +18236,100 @@ public void testShareGroupDynamicConfigs() {
1809318236
context.assertNoRebalanceTimeout(groupId, memberId);
1809418237
}
1809518238

18239+
@Test
18240+
public void testStreamsGroupDynamicConfigs() {
18241+
String groupId = "fooup";
18242+
String memberId = Uuid.randomUuid().toString();
18243+
String subtopology1 = "subtopology1";
18244+
String fooTopicName = "foo";
18245+
Uuid fooTopicId = Uuid.randomUuid();
18246+
Topology topology = new Topology().setSubtopologies(List.of(
18247+
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
18248+
));
18249+
18250+
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
18251+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
18252+
.withStreamsGroupTaskAssignors(List.of(assignor))
18253+
.withMetadataImage(new MetadataImageBuilder()
18254+
.addTopic(fooTopicId, fooTopicName, 6)
18255+
.addRacks()
18256+
.build())
18257+
.build();
18258+
18259+
assignor.prepareGroupAssignment(
18260+
Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
18261+
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
18262+
18263+
// Session timer is scheduled on first heartbeat.
18264+
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result =
18265+
context.streamsGroupHeartbeat(
18266+
new StreamsGroupHeartbeatRequestData()
18267+
.setGroupId(groupId)
18268+
.setMemberId(memberId)
18269+
.setMemberEpoch(0)
18270+
.setRebalanceTimeoutMs(10000)
18271+
.setTopology(topology)
18272+
.setActiveTasks(List.of())
18273+
.setStandbyTasks(List.of())
18274+
.setWarmupTasks(List.of()));
18275+
assertEquals(1, result.response().data().memberEpoch());
18276+
assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs());
18277+
18278+
// Verify heartbeat interval
18279+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, result.response().data().heartbeatIntervalMs());
18280+
18281+
// Verify that there is a session time.
18282+
context.assertSessionTimeout(groupId, memberId, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
18283+
18284+
// Advance time.
18285+
assertEquals(
18286+
List.of(),
18287+
context.sleep(result.response().data().heartbeatIntervalMs())
18288+
);
18289+
18290+
// Dynamic update group config
18291+
Properties newGroupConfig = new Properties();
18292+
newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 50000);
18293+
newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
18294+
newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 2);
18295+
context.updateGroupConfig(groupId, newGroupConfig);
18296+
18297+
// Session timer is rescheduled on second heartbeat, new assignment with new parameter is calculated.
18298+
result = context.streamsGroupHeartbeat(
18299+
new StreamsGroupHeartbeatRequestData()
18300+
.setGroupId(groupId)
18301+
.setMemberId(memberId)
18302+
.setMemberEpoch(result.response().data().memberEpoch())
18303+
.setRackId("bla"));
18304+
18305+
// Verify heartbeat interval
18306+
assertEquals(10000, result.response().data().heartbeatIntervalMs());
18307+
18308+
// Verify that there is a session time.
18309+
context.assertSessionTimeout(groupId, memberId, 50000);
18310+
18311+
// Verify that the new number of standby replicas is used
18312+
assertEquals(Map.of("num.standby.replicas", "2"), assignor.lastPassedAssignmentConfigs());
18313+
18314+
// Advance time.
18315+
assertEquals(
18316+
List.of(),
18317+
context.sleep(result.response().data().heartbeatIntervalMs())
18318+
);
18319+
18320+
// Session timer is cancelled on leave.
18321+
result = context.streamsGroupHeartbeat(
18322+
new StreamsGroupHeartbeatRequestData()
18323+
.setGroupId(groupId)
18324+
.setMemberId(memberId)
18325+
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
18326+
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().data().memberEpoch());
18327+
18328+
// Verify that there are no timers.
18329+
context.assertNoSessionTimeout(groupId, memberId);
18330+
context.assertNoRebalanceTimeout(groupId, memberId);
18331+
}
18332+
1809618333
@Test
1809718334
public void testReplayConsumerGroupMemberMetadata() {
1809818335
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class MockTaskAssignor implements TaskAssignor {
3131

3232
private final String name;
3333
private GroupAssignment preparedGroupAssignment = null;
34+
private Map<String, String> assignmentConfigs = Map.of();
3435

3536
public MockTaskAssignor(String name) {
3637
this.name = name;
@@ -52,6 +53,10 @@ public void prepareGroupAssignment(Map<String, TasksTuple> memberAssignments) {
5253
})));
5354
}
5455

56+
public Map<String, String> lastPassedAssignmentConfigs() {
57+
return assignmentConfigs;
58+
}
59+
5560
@Override
5661
public String name() {
5762
return name;
@@ -60,6 +65,7 @@ public String name() {
6065
@Override
6166
public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber)
6267
throws TaskAssignorException {
68+
assignmentConfigs = groupSpec.assignmentConfigs();
6369
return preparedGroupAssignment;
6470
}
6571
}

0 commit comments

Comments
 (0)