Skip to content

Commit 71a7d85

Browse files
KAFKA-19431: Ensure consumer and share assignment consistency with subscriptions (#20055)
Filter out unsubscribed topics during reconciliation. This eliminates the window where a consumer group assignment could contain unsubscribed topics when a member unsubscribes from a topic while it has unrevoked partitions. We also apply filtering in a few other cases that would arise when client-side assignors are implemented, since new assignments would no longer be available immediately. This is important for mixed groups, since clients on the classic protocol will rejoin if they receive a topic in their assignment that is no longer in their subscription. Regex subscriptions have a window where the regex is not resolved and we cannot know which topics are part of the subscription. We opt to be conservative and treat unresolved regexes as matching no topics. The same change is applied to share groups, since the reconciliation process is similar. To gauge the performance impact of the change, we add a jmh benchmark. Reviewers: Lucas Brutschy <[email protected]>, Lianet Magran <[email protected]>, Sushant Mahajan <[email protected]>, Dongnuo Lyu <[email protected]>, David Jacot <[email protected]>
1 parent 611f412 commit 71a7d85

File tree

8 files changed

+1363
-71
lines changed

8 files changed

+1363
-71
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 79 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2249,28 +2249,38 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
22492249
.setClassicMemberMetadata(null)
22502250
.build();
22512251

2252-
// If the group is newly created, we must ensure that it moves away from
2253-
// epoch 0 and that it is fully initialized.
2254-
boolean bumpGroupEpoch = group.groupEpoch() == 0;
2255-
2256-
bumpGroupEpoch |= hasMemberSubscriptionChanged(
2252+
boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged(
22572253
groupId,
22582254
member,
22592255
updatedMember,
22602256
records
22612257
);
2262-
2263-
bumpGroupEpoch |= maybeUpdateRegularExpressions(
2258+
UpdateRegularExpressionsResult updateRegularExpressionsResult = maybeUpdateRegularExpressions(
22642259
context,
22652260
group,
22662261
member,
22672262
updatedMember,
22682263
records
22692264
);
22702265

2266+
// The subscription has changed when either the subscribed topic names or subscribed topic
2267+
// regex has changed.
2268+
boolean hasSubscriptionChanged = subscribedTopicNamesChanged || updateRegularExpressionsResult.regexUpdated();
22712269
int groupEpoch = group.groupEpoch();
22722270
SubscriptionType subscriptionType = group.subscriptionType();
22732271

2272+
boolean bumpGroupEpoch =
2273+
// If the group is newly created, we must ensure that it moves away from
2274+
// epoch 0 and that it is fully initialized.
2275+
groupEpoch == 0 ||
2276+
// Bumping the group epoch signals that the target assignment should be updated. We bump
2277+
// the group epoch when the member has changed its subscribed topic names or the member
2278+
// has changed its subscribed topic regex to a regex that is already resolved. We avoid
2279+
// bumping the group epoch when the new subscribed topic regex has not been resolved
2280+
// yet, since we will have to update the target assignment again later.
2281+
subscribedTopicNamesChanged ||
2282+
updateRegularExpressionsResult == UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
2283+
22742284
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
22752285
// The subscription metadata is updated in two cases:
22762286
// 1) The member has updated its subscriptions;
@@ -2315,6 +2325,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
23152325
group::currentPartitionEpoch,
23162326
targetAssignmentEpoch,
23172327
targetAssignment,
2328+
group.resolvedRegularExpressions(),
2329+
// Force consistency with the subscription when the subscription has changed.
2330+
hasSubscriptionChanged,
23182331
ownedTopicPartitions,
23192332
records
23202333
);
@@ -2468,6 +2481,8 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
24682481
group::currentPartitionEpoch,
24692482
group.assignmentEpoch(),
24702483
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
2484+
group.resolvedRegularExpressions(),
2485+
bumpGroupEpoch,
24712486
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
24722487
records
24732488
);
@@ -2511,6 +2526,9 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
25112526
group::currentPartitionEpoch,
25122527
targetAssignmentEpoch,
25132528
targetAssignment,
2529+
group.resolvedRegularExpressions(),
2530+
// Force consistency with the subscription when the subscription has changed.
2531+
bumpGroupEpoch,
25142532
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
25152533
records
25162534
);
@@ -2669,6 +2687,8 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
26692687
updatedMember,
26702688
targetAssignmentEpoch,
26712689
targetAssignment,
2690+
// Force consistency with the subscription when the subscription has changed.
2691+
bumpGroupEpoch,
26722692
records
26732693
);
26742694

@@ -3108,6 +3128,16 @@ private static boolean isNotEmpty(String value) {
31083128
return value != null && !value.isEmpty();
31093129
}
31103130

3131+
private enum UpdateRegularExpressionsResult {
3132+
NO_CHANGE,
3133+
REGEX_UPDATED,
3134+
REGEX_UPDATED_AND_RESOLVED;
3135+
3136+
public boolean regexUpdated() {
3137+
return this == REGEX_UPDATED || this == REGEX_UPDATED_AND_RESOLVED;
3138+
}
3139+
}
3140+
31113141
/**
31123142
* Check whether the member has updated its subscribed topic regular expression and
31133143
* may trigger the resolution/the refresh of all the regular expressions in the
@@ -3119,9 +3149,9 @@ private static boolean isNotEmpty(String value) {
31193149
* @param member The old member.
31203150
* @param updatedMember The new member.
31213151
* @param records The records accumulator.
3122-
* @return Whether a rebalance must be triggered.
3152+
* @return The result of the update.
31233153
*/
3124-
private boolean maybeUpdateRegularExpressions(
3154+
private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
31253155
AuthorizableRequestContext context,
31263156
ConsumerGroup group,
31273157
ConsumerGroupMember member,
@@ -3134,14 +3164,17 @@ private boolean maybeUpdateRegularExpressions(
31343164
String oldSubscribedTopicRegex = member.subscribedTopicRegex();
31353165
String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
31363166

3137-
boolean bumpGroupEpoch = false;
31383167
boolean requireRefresh = false;
3168+
UpdateRegularExpressionsResult updateRegularExpressionsResult = UpdateRegularExpressionsResult.NO_CHANGE;
31393169

31403170
// Check whether the member has changed its subscribed regex.
3141-
if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex)) {
3171+
boolean subscribedTopicRegexChanged = !Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex);
3172+
if (subscribedTopicRegexChanged) {
31423173
log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.",
31433174
groupId, memberId, newSubscribedTopicRegex);
31443175

3176+
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED;
3177+
31453178
if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
31463179
// If the member was the last one subscribed to the regex, we delete the
31473180
// resolved regular expression.
@@ -3160,7 +3193,9 @@ private boolean maybeUpdateRegularExpressions(
31603193
} else {
31613194
// If the new regex is already resolved, we trigger a rebalance
31623195
// by bumping the group epoch.
3163-
bumpGroupEpoch = group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
3196+
if (group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) {
3197+
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
3198+
}
31643199
}
31653200
}
31663201
}
@@ -3176,20 +3211,20 @@ private boolean maybeUpdateRegularExpressions(
31763211
// 0. The group is subscribed to regular expressions. We also take the one
31773212
// that the current may have just introduced.
31783213
if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) {
3179-
return bumpGroupEpoch;
3214+
return updateRegularExpressionsResult;
31803215
}
31813216

31823217
// 1. There is no ongoing refresh for the group.
31833218
String key = group.groupId() + "-regex";
31843219
if (executor.isScheduled(key)) {
3185-
return bumpGroupEpoch;
3220+
return updateRegularExpressionsResult;
31863221
}
31873222

31883223
// 2. The last refresh is older than 10s. If the group does not have any regular
31893224
// expressions but the current member just brought a new one, we should continue.
31903225
long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs();
31913226
if (currentTimeMs <= lastRefreshTimeMs + REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
3192-
return bumpGroupEpoch;
3227+
return updateRegularExpressionsResult;
31933228
}
31943229

31953230
// 3.1 The group has unresolved regular expressions.
@@ -3218,7 +3253,7 @@ private boolean maybeUpdateRegularExpressions(
32183253
);
32193254
}
32203255

3221-
return bumpGroupEpoch;
3256+
return updateRegularExpressionsResult;
32223257
}
32233258

32243259
/**
@@ -3492,16 +3527,18 @@ private boolean hasStreamsMemberMetadataChanged(
34923527
/**
34933528
* Reconciles the current assignment of the member towards the target assignment if needed.
34943529
*
3495-
* @param groupId The group id.
3496-
* @param member The member to reconcile.
3497-
* @param currentPartitionEpoch The function returning the current epoch of
3498-
* a given partition.
3499-
* @param targetAssignmentEpoch The target assignment epoch.
3500-
* @param targetAssignment The target assignment.
3501-
* @param ownedTopicPartitions The list of partitions owned by the member. This
3502-
* is reported in the ConsumerGroupHeartbeat API and
3503-
* it could be null if not provided.
3504-
* @param records The list to accumulate any new records.
3530+
* @param groupId The group id.
3531+
* @param member The member to reconcile.
3532+
* @param currentPartitionEpoch The function returning the current epoch of
3533+
* a given partition.
3534+
* @param targetAssignmentEpoch The target assignment epoch.
3535+
* @param targetAssignment The target assignment.
3536+
* @param resolvedRegularExpressions The resolved regular expressions.
3537+
* @param hasSubscriptionChanged Whether the member has changed its subscription on the current heartbeat.
3538+
* @param ownedTopicPartitions The list of partitions owned by the member. This
3539+
* is reported in the ConsumerGroupHeartbeat API and
3540+
* it could be null if not provided.
3541+
* @param records The list to accumulate any new records.
35053542
* @return The received member if no changes have been made; or a new
35063543
* member containing the new assignment.
35073544
*/
@@ -3511,15 +3548,20 @@ private ConsumerGroupMember maybeReconcile(
35113548
BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
35123549
int targetAssignmentEpoch,
35133550
Assignment targetAssignment,
3551+
Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
3552+
boolean hasSubscriptionChanged,
35143553
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
35153554
List<CoordinatorRecord> records
35163555
) {
3517-
if (member.isReconciledTo(targetAssignmentEpoch)) {
3556+
if (!hasSubscriptionChanged && member.isReconciledTo(targetAssignmentEpoch)) {
35183557
return member;
35193558
}
35203559

35213560
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
3561+
.withMetadataImage(metadataImage)
35223562
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
3563+
.withHasSubscriptionChanged(hasSubscriptionChanged)
3564+
.withResolvedRegularExpressions(resolvedRegularExpressions)
35233565
.withCurrentPartitionEpoch(currentPartitionEpoch)
35243566
.withOwnedTopicPartitions(ownedTopicPartitions)
35253567
.build();
@@ -3556,11 +3598,12 @@ private ConsumerGroupMember maybeReconcile(
35563598
/**
35573599
* Reconciles the current assignment of the member towards the target assignment if needed.
35583600
*
3559-
* @param groupId The group id.
3560-
* @param member The member to reconcile.
3561-
* @param targetAssignmentEpoch The target assignment epoch.
3562-
* @param targetAssignment The target assignment.
3563-
* @param records The list to accumulate any new records.
3601+
* @param groupId The group id.
3602+
* @param member The member to reconcile.
3603+
* @param targetAssignmentEpoch The target assignment epoch.
3604+
* @param targetAssignment The target assignment.
3605+
* @param hasSubscriptionChanged Whether the member has changed its subscription on the current heartbeat.
3606+
* @param records The list to accumulate any new records.
35643607
* @return The received member if no changes have been made; or a new
35653608
* member containing the new assignment.
35663609
*/
@@ -3569,14 +3612,17 @@ private ShareGroupMember maybeReconcile(
35693612
ShareGroupMember member,
35703613
int targetAssignmentEpoch,
35713614
Assignment targetAssignment,
3615+
boolean hasSubscriptionChanged,
35723616
List<CoordinatorRecord> records
35733617
) {
3574-
if (member.isReconciledTo(targetAssignmentEpoch)) {
3618+
if (!hasSubscriptionChanged && member.isReconciledTo(targetAssignmentEpoch)) {
35753619
return member;
35763620
}
35773621

35783622
ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member)
3623+
.withMetadataImage(metadataImage)
35793624
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
3625+
.withHasSubscriptionChanged(hasSubscriptionChanged)
35803626
.build();
35813627

35823628
if (!updatedMember.equals(member)) {

0 commit comments

Comments
 (0)