Skip to content

Commit 9c2a096

Browse files
authored
KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [1/N] (#21557)
# Summary This PR moves `assignedPartitions` out of `ModernConsumerMember` interface, add it as independent properties for `ShareGroupMember` and `ConsumerGroupMember`. ## Reason for the change In an upcoming PR, the structure of `ConsumerGroupMember#assignedPartitions` and `ConsumerGroupMember#partitionsPendingRevocation` will be changed to include epoch information as ``` Map<Uuid, Map<Integer, Integer>> ``` This differs from the `ShareGroupMember#assignedPartitions` structure, which remains `Map<Uuid, Set<Integer>>`. Therefore, it is no longer appropriate to have this as a shared field in the base class. Reviewers: Sean Quah <squah@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
1 parent 4e97dad commit 9c2a096

File tree

8 files changed

+94
-46
lines changed

8 files changed

+94
-46
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@
239239
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
240240
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
241241
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
242-
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
243242
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
244243
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
245244
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
@@ -2436,7 +2435,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
24362435
// to detect a full request as those must be set in a full request.
24372436
// 2. The member's assignment has been updated.
24382437
boolean isFullRequest = rebalanceTimeoutMs != -1 && (subscribedTopicNames != null || subscribedTopicRegex != null) && ownedTopicPartitions != null;
2439-
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
2438+
if (memberEpoch == 0 || isFullRequest || ConsumerGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
24402439
response.setAssignment(ConsumerGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
24412440
}
24422441

@@ -2808,7 +2807,7 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
28082807
// (subscribedTopicNames) to detect a full request as those must be set in a full request.
28092808
// 2. The member's assignment has been updated.
28102809
boolean isFullRequest = subscribedTopicNames != null;
2811-
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
2810+
if (memberEpoch == 0 || isFullRequest || ShareGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
28122811
response.setAssignment(ShareGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
28132812
}
28142813
return new CoordinatorResult<>(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.modern;
1818

19-
import org.apache.kafka.common.Uuid;
20-
21-
import java.util.Map;
2219
import java.util.Set;
2320

2421
/**
@@ -71,11 +68,6 @@ public abstract class ModernGroupMember {
7168
*/
7269
protected Set<String> subscribedTopicNames;
7370

74-
/**
75-
* The partitions assigned to this member.
76-
*/
77-
protected Map<Uuid, Set<Integer>> assignedPartitions;
78-
7971
protected ModernGroupMember(
8072
String memberId,
8173
int memberEpoch,
@@ -85,8 +77,7 @@ protected ModernGroupMember(
8577
String clientId,
8678
String clientHost,
8779
Set<String> subscribedTopicNames,
88-
MemberState state,
89-
Map<Uuid, Set<Integer>> assignedPartitions
80+
MemberState state
9081
) {
9182
this.memberId = memberId;
9283
this.memberEpoch = memberEpoch;
@@ -97,7 +88,6 @@ protected ModernGroupMember(
9788
this.clientId = clientId;
9889
this.clientHost = clientHost;
9990
this.subscribedTopicNames = subscribedTopicNames;
100-
this.assignedPartitions = assignedPartitions;
10191
}
10292

10393
/**
@@ -169,21 +159,4 @@ public MemberState state() {
169159
public boolean isReconciledTo(int targetAssignmentEpoch) {
170160
return state == MemberState.STABLE && memberEpoch == targetAssignmentEpoch;
171161
}
172-
173-
/**
174-
* @return The set of assigned partitions.
175-
*/
176-
public Map<Uuid, Set<Integer>> assignedPartitions() {
177-
return assignedPartitions;
178-
}
179-
180-
/**
181-
* @return True of the two provided members have different assigned partitions.
182-
*/
183-
public static boolean hasAssignedPartitionsChanged(
184-
ModernGroupMember member1,
185-
ModernGroupMember member2
186-
) {
187-
return !member1.assignedPartitions().equals(member2.assignedPartitions());
188-
}
189162
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ public ConsumerGroupMember build() {
263263
*/
264264
private final String serverAssignorName;
265265

266+
/**
267+
* The partitions assigned to this member.
268+
*/
269+
private final Map<Uuid, Set<Integer>> assignedPartitions;
270+
266271
/**
267272
* The partitions being revoked by this member.
268273
*/
@@ -299,12 +304,12 @@ private ConsumerGroupMember(
299304
clientId,
300305
clientHost,
301306
subscribedTopicNames,
302-
state,
303-
assignedPartitions
307+
state
304308
);
305309
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
306310
this.subscribedTopicRegex = subscribedTopicRegex;
307311
this.serverAssignorName = serverAssignorName;
312+
this.assignedPartitions = assignedPartitions;
308313
this.partitionsPendingRevocation = partitionsPendingRevocation;
309314
this.classicMemberMetadata = classicMemberMetadata;
310315
}
@@ -330,13 +335,30 @@ public Optional<String> serverAssignorName() {
330335
return Optional.ofNullable(serverAssignorName);
331336
}
332337

338+
/**
339+
* @return The set of assigned partitions.
340+
*/
341+
public Map<Uuid, Set<Integer>> assignedPartitions() {
342+
return assignedPartitions;
343+
}
344+
333345
/**
334346
* @return The set of partitions awaiting revocation from the member.
335347
*/
336348
public Map<Uuid, Set<Integer>> partitionsPendingRevocation() {
337349
return partitionsPendingRevocation;
338350
}
339351

352+
/**
353+
* @return True if the two provided members have different assigned partitions.
354+
*/
355+
public static boolean hasAssignedPartitionsChanged(
356+
ConsumerGroupMember member1,
357+
ConsumerGroupMember member2
358+
) {
359+
return !member1.assignedPartitions().equals(member2.assignedPartitions());
360+
}
361+
340362
/**
341363
* @return The supported classic protocols converted to JoinGroupRequestProtocolCollection.
342364
*/

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ public ShareGroupMember build() {
170170
}
171171
}
172172

173+
/**
174+
* The partitions assigned to this member.
175+
*/
176+
private final Map<Uuid, Set<Integer>> assignedPartitions;
177+
173178
private ShareGroupMember(
174179
String memberId,
175180
int memberEpoch,
@@ -190,9 +195,26 @@ private ShareGroupMember(
190195
clientId,
191196
clientHost,
192197
subscribedTopicNames,
193-
state,
194-
assignedPartitions
198+
state
195199
);
200+
this.assignedPartitions = assignedPartitions;
201+
}
202+
203+
/**
204+
* @return The partitions assigned to this member.
205+
*/
206+
public Map<Uuid, Set<Integer>> assignedPartitions() {
207+
return assignedPartitions;
208+
}
209+
210+
/**
211+
* @return True if the two provided members have different assigned partitions.
212+
*/
213+
public static boolean hasAssignedPartitionsChanged(
214+
ShareGroupMember member1,
215+
ShareGroupMember member2
216+
) {
217+
return !member1.assignedPartitions().equals(member2.assignedPartitions());
196218
}
197219

198220
/**

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.kafka.coordinator.group.modern.Assignment;
2929
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
3030
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
31-
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
3231
import org.apache.kafka.coordinator.group.modern.TopicIds;
3332
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
3433
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
@@ -115,24 +114,23 @@ public static CoordinatorMetadataImage createMetadataImage(
115114
}
116115

117116
/**
118-
* Creates a GroupSpec from the given ModernGroupMembers.
117+
* Creates a GroupSpec from the given ConsumerGroupMembers.
119118
*
120-
* @param members The ModernGroupMembers.
119+
* @param members The ConsumerGroupMembers.
121120
* @param subscriptionType The group's subscription type.
122121
* @param topicResolver The TopicResolver to use.
123122
* @return The new GroupSpec.
124123
*/
125-
public static GroupSpec createGroupSpec(
126-
Map<String, ? extends ModernGroupMember> members,
124+
public static GroupSpec createConsumerGroupSpec(
125+
Map<String, ConsumerGroupMember> members,
127126
SubscriptionType subscriptionType,
128127
TopicIds.TopicResolver topicResolver
129128
) {
130129
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
131130

132-
// Prepare the member spec for all members.
133-
for (Map.Entry<String, ? extends ModernGroupMember> memberEntry : members.entrySet()) {
131+
for (Map.Entry<String, ConsumerGroupMember> memberEntry : members.entrySet()) {
134132
String memberId = memberEntry.getKey();
135-
ModernGroupMember member = memberEntry.getValue();
133+
ConsumerGroupMember member = memberEntry.getValue();
136134

137135
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
138136
Optional.ofNullable(member.rackId()),
@@ -149,6 +147,40 @@ public static GroupSpec createGroupSpec(
149147
);
150148
}
151149

150+
/**
151+
* Creates a GroupSpec from the given ShareGroupMembers.
152+
*
153+
* @param members The ShareGroupMembers.
154+
* @param subscriptionType The group's subscription type.
155+
* @param topicResolver The TopicResolver to use.
156+
* @return The new GroupSpec.
157+
*/
158+
public static GroupSpec createShareGroupSpec(
159+
Map<String, ShareGroupMember> members,
160+
SubscriptionType subscriptionType,
161+
TopicIds.TopicResolver topicResolver
162+
) {
163+
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
164+
165+
for (Map.Entry<String, ShareGroupMember> memberEntry : members.entrySet()) {
166+
String memberId = memberEntry.getKey();
167+
ShareGroupMember member = memberEntry.getValue();
168+
169+
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
170+
Optional.ofNullable(member.rackId()),
171+
Optional.empty(),
172+
new TopicIds(member.subscribedTopicNames(), topicResolver),
173+
new Assignment(member.assignedPartitions())
174+
));
175+
}
176+
177+
return new GroupSpecImpl(
178+
memberSpecs,
179+
subscriptionType,
180+
Map.of()
181+
);
182+
}
183+
152184
/**
153185
* Creates a ConsumerGroupMembers map where all members have the same topic subscriptions.
154186
*

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void setup() {
135135
setupTopics();
136136

137137
Map<String, ConsumerGroupMember> members = createMembers();
138-
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver);
138+
this.groupSpec = AssignorBenchmarkUtils.createConsumerGroupSpec(members, subscriptionType, topicResolver);
139139

140140
if (assignmentType == AssignmentType.INCREMENTAL) {
141141
simulateIncrementalRebalance();

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void setup() {
126126
setupTopics();
127127

128128
Map<String, ShareGroupMember> members = createMembers();
129-
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver);
129+
this.groupSpec = AssignorBenchmarkUtils.createShareGroupSpec(members, subscriptionType, topicResolver);
130130

131131
if (assignmentType == AssignmentType.INCREMENTAL) {
132132
simulateIncrementalRebalance();

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private void setupTopics() {
135135
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(
136136
Map<String, ConsumerGroupMember> members
137137
) {
138-
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
138+
this.groupSpec = AssignorBenchmarkUtils.createConsumerGroupSpec(
139139
members,
140140
subscriptionType,
141141
topicResolver

0 commit comments

Comments
 (0)