Skip to content

Commit e76213e

Browse files
authored
KAFKA-19546: Rebalance should be triggered by subscription change during group protocol downgrade (#20581)
Cherry-pick KAFKA-19546 to 4.1. During online downgrade, when a static member using the consumer protocol which is also the last member using the consumer protocol is replaced by another static member using the classic protocol with the same instance id, the latter will take the assignment of the former and an online downgrade will be triggered. In the current implementation, if the replacing static member has a different subscription, no rebalance will be triggered when the downgrade happens. The patch checks whether the static member has changed subscription and triggers a rebalance when it does. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent 02d58b1 commit e76213e

File tree

2 files changed

+144
-89
lines changed

2 files changed

+144
-89
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,16 +1252,19 @@ private boolean validateOnlineDowngradeWithReplacedMember(
12521252
/**
12531253
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
12541254
*
1255-
* @param consumerGroup The converted ConsumerGroup.
1256-
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
1257-
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
1258-
* When not null, must have an instanceId that matches an existing member.
1259-
* @param records The record list to which the conversion records are added.
1255+
* @param consumerGroup The converted ConsumerGroup.
1256+
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
1257+
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
1258+
* When not null, must have an instanceId that matches the replaced member.
1259+
* @param hasSubscriptionChanged The boolean indicating whether the joining member has a different subscription
1260+
* from the replaced member. Only used when joiningMember is set.
1261+
* @param records The record list to which the conversion records are added.
12601262
*/
12611263
private void convertToClassicGroup(
12621264
ConsumerGroup consumerGroup,
12631265
Set<ConsumerGroupMember> leavingMembers,
12641266
ConsumerGroupMember joiningMember,
1267+
boolean hasSubscriptionChanged,
12651268
List<CoordinatorRecord> records
12661269
) {
12671270
if (joiningMember == null) {
@@ -1302,9 +1305,12 @@ private void convertToClassicGroup(
13021305

13031306
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
13041307

1305-
// If the downgrade is triggered by a member leaving the group, a rebalance should be triggered.
1308+
// If the downgrade is triggered by a member leaving the group or a static
1309+
// member replacement with a different subscription, a rebalance should be triggered.
13061310
if (joiningMember == null) {
1307-
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
1311+
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for member leaving.", classicGroup.groupId()));
1312+
} else if (hasSubscriptionChanged) {
1313+
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for static member replacement with different subscription.", classicGroup.groupId()));
13081314
}
13091315

13101316
log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId());
@@ -2397,6 +2403,10 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
23972403
);
23982404
}
23992405

2406+
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
2407+
boolean downgrade = existingStaticMemberOrNull != null &&
2408+
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
2409+
24002410
int groupEpoch = group.groupEpoch();
24012411
SubscriptionType subscriptionType = group.subscriptionType();
24022412
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
@@ -2443,49 +2453,61 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
24432453
subscriptionType = result.subscriptionType;
24442454
}
24452455

2446-
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
2447-
// the existing and the new target assignment is persisted to the partition.
2448-
final int targetAssignmentEpoch;
2449-
final Assignment targetAssignment;
2456+
if (downgrade) {
2457+
// 2. If the static member subscription hasn't changed, reconcile the member's assignment with the existing
2458+
// assignment if the member is not fully reconciled yet. If the static member subscription has changed, a
2459+
// rebalance will be triggered during downgrade anyway so we can skip the reconciliation.
2460+
if (!bumpGroupEpoch) {
2461+
updatedMember = maybeReconcile(
2462+
groupId,
2463+
updatedMember,
2464+
group::currentPartitionEpoch,
2465+
group.assignmentEpoch(),
2466+
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
2467+
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
2468+
records
2469+
);
2470+
}
24502471

2451-
if (groupEpoch > group.assignmentEpoch()) {
2452-
targetAssignment = updateTargetAssignment(
2472+
// 3. Downgrade the consumer group.
2473+
convertToClassicGroup(
24532474
group,
2454-
groupEpoch,
2455-
member,
2475+
Set.of(),
24562476
updatedMember,
2457-
subscriptionType,
2477+
bumpGroupEpoch,
24582478
records
24592479
);
2460-
targetAssignmentEpoch = groupEpoch;
24612480
} else {
2462-
targetAssignmentEpoch = group.assignmentEpoch();
2463-
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
2481+
// If no downgrade is triggered.
24642482

2465-
}
2483+
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch.
2484+
// The delta between the existing and the new target assignment is persisted to the partition.
2485+
final int targetAssignmentEpoch;
2486+
final Assignment targetAssignment;
24662487

2467-
// 3. Reconcile the member's assignment with the target assignment if the member is not
2468-
// fully reconciled yet.
2469-
updatedMember = maybeReconcile(
2470-
groupId,
2471-
updatedMember,
2472-
group::currentPartitionEpoch,
2473-
targetAssignmentEpoch,
2474-
targetAssignment,
2475-
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
2476-
records
2477-
);
2488+
if (groupEpoch > group.assignmentEpoch()) {
2489+
targetAssignment = updateTargetAssignment(
2490+
group,
2491+
groupEpoch,
2492+
member,
2493+
updatedMember,
2494+
subscriptionType,
2495+
records
2496+
);
2497+
targetAssignmentEpoch = groupEpoch;
2498+
} else {
2499+
targetAssignmentEpoch = group.assignmentEpoch();
2500+
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
2501+
}
24782502

2479-
// 4. Maybe downgrade the consumer group if the last static member using the
2480-
// consumer protocol is replaced by the joining static member.
2481-
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
2482-
boolean downgrade = existingStaticMemberOrNull != null &&
2483-
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
2484-
if (downgrade) {
2485-
convertToClassicGroup(
2486-
group,
2487-
Set.of(),
2503+
// 3. Reconcile the member's assignment with the target assignment if the member is not fully reconciled yet.
2504+
updatedMember = maybeReconcile(
2505+
groupId,
24882506
updatedMember,
2507+
group::currentPartitionEpoch,
2508+
targetAssignmentEpoch,
2509+
targetAssignment,
2510+
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
24892511
records
24902512
);
24912513
}
@@ -4058,7 +4080,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMembers(
40584080

40594081
List<CoordinatorRecord> records = new ArrayList<>();
40604082
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
4061-
convertToClassicGroup(group, members, null, records);
4083+
convertToClassicGroup(group, members, null, false, records);
40624084
return new CoordinatorResult<>(records, response, null, false);
40634085
} else {
40644086
for (ConsumerGroupMember member : members) {

0 commit comments

Comments
 (0)