Skip to content

Commit cbea4f6

Browse files
authored
KAFKA-19546: Rebalance should be triggered by subscription change during group protocol downgrade (#20417)
During online downgrade, when a static member using the consumer protocol which is also the last member using the consumer protocol is replaced by another static member using the classic protocol with the same instance id, the latter will take the assignment of the former and an online downgrade will be triggered. In the current implementation, if the replacing static member has a different subscription, no rebalance will be triggered when the downgrade happens. The patch checks whether the static member has changed subscription and triggers a rebalance when it does. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent 55020f9 commit cbea4f6

File tree

2 files changed

+144
-89
lines changed

2 files changed

+144
-89
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,16 +1257,19 @@ private boolean validateOnlineDowngradeWithReplacedMember(
12571257
/**
12581258
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
12591259
*
1260-
* @param consumerGroup The converted ConsumerGroup.
1261-
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
1262-
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
1263-
* When not null, must have an instanceId that matches an existing member.
1264-
* @param records The record list to which the conversion records are added.
1260+
* @param consumerGroup The converted ConsumerGroup.
1261+
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
1262+
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
1263+
* When not null, must have an instanceId that matches the replaced member.
1264+
* @param hasSubscriptionChanged The boolean indicating whether the joining member has a different subscription
1265+
* from the replaced member. Only used when joiningMember is set.
1266+
* @param records The record list to which the conversion records are added.
12651267
*/
12661268
private void convertToClassicGroup(
12671269
ConsumerGroup consumerGroup,
12681270
Set<ConsumerGroupMember> leavingMembers,
12691271
ConsumerGroupMember joiningMember,
1272+
boolean hasSubscriptionChanged,
12701273
List<CoordinatorRecord> records
12711274
) {
12721275
if (joiningMember == null) {
@@ -1307,9 +1310,12 @@ private void convertToClassicGroup(
13071310

13081311
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
13091312

1310-
// If the downgrade is triggered by a member leaving the group, a rebalance should be triggered.
1313+
// If the downgrade is triggered by a member leaving the group or a static
1314+
// member replacement with a different subscription, a rebalance should be triggered.
13111315
if (joiningMember == null) {
1312-
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
1316+
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for member leaving.", classicGroup.groupId()));
1317+
} else if (hasSubscriptionChanged) {
1318+
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for static member replacement with different subscription.", classicGroup.groupId()));
13131319
}
13141320

13151321
log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId());
@@ -2401,6 +2407,10 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
24012407
);
24022408
}
24032409

2410+
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
2411+
boolean downgrade = existingStaticMemberOrNull != null &&
2412+
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
2413+
24042414
int groupEpoch = group.groupEpoch();
24052415
SubscriptionType subscriptionType = group.subscriptionType();
24062416
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);
@@ -2447,49 +2457,61 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
24472457
subscriptionType = result.subscriptionType;
24482458
}
24492459

2450-
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
2451-
// the existing and the new target assignment is persisted to the partition.
2452-
final int targetAssignmentEpoch;
2453-
final Assignment targetAssignment;
2460+
if (downgrade) {
2461+
// 2. If the static member subscription hasn't changed, reconcile the member's assignment with the existing
2462+
// assignment if the member is not fully reconciled yet. If the static member subscription has changed, a
2463+
// rebalance will be triggered during downgrade anyway so we can skip the reconciliation.
2464+
if (!bumpGroupEpoch) {
2465+
updatedMember = maybeReconcile(
2466+
groupId,
2467+
updatedMember,
2468+
group::currentPartitionEpoch,
2469+
group.assignmentEpoch(),
2470+
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
2471+
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
2472+
records
2473+
);
2474+
}
24542475

2455-
if (groupEpoch > group.assignmentEpoch()) {
2456-
targetAssignment = updateTargetAssignment(
2476+
// 3. Downgrade the consumer group.
2477+
convertToClassicGroup(
24572478
group,
2458-
groupEpoch,
2459-
member,
2479+
Set.of(),
24602480
updatedMember,
2461-
subscriptionType,
2481+
bumpGroupEpoch,
24622482
records
24632483
);
2464-
targetAssignmentEpoch = groupEpoch;
24652484
} else {
2466-
targetAssignmentEpoch = group.assignmentEpoch();
2467-
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
2485+
// If no downgrade is triggered.
24682486

2469-
}
2487+
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch.
2488+
// The delta between the existing and the new target assignment is persisted to the partition.
2489+
final int targetAssignmentEpoch;
2490+
final Assignment targetAssignment;
24702491

2471-
// 3. Reconcile the member's assignment with the target assignment if the member is not
2472-
// fully reconciled yet.
2473-
updatedMember = maybeReconcile(
2474-
groupId,
2475-
updatedMember,
2476-
group::currentPartitionEpoch,
2477-
targetAssignmentEpoch,
2478-
targetAssignment,
2479-
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
2480-
records
2481-
);
2492+
if (groupEpoch > group.assignmentEpoch()) {
2493+
targetAssignment = updateTargetAssignment(
2494+
group,
2495+
groupEpoch,
2496+
member,
2497+
updatedMember,
2498+
subscriptionType,
2499+
records
2500+
);
2501+
targetAssignmentEpoch = groupEpoch;
2502+
} else {
2503+
targetAssignmentEpoch = group.assignmentEpoch();
2504+
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
2505+
}
24822506

2483-
// 4. Maybe downgrade the consumer group if the last static member using the
2484-
// consumer protocol is replaced by the joining static member.
2485-
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
2486-
boolean downgrade = existingStaticMemberOrNull != null &&
2487-
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
2488-
if (downgrade) {
2489-
convertToClassicGroup(
2490-
group,
2491-
Set.of(),
2507+
// 3. Reconcile the member's assignment with the target assignment if the member is not fully reconciled yet.
2508+
updatedMember = maybeReconcile(
2509+
groupId,
24922510
updatedMember,
2511+
group::currentPartitionEpoch,
2512+
targetAssignmentEpoch,
2513+
targetAssignment,
2514+
toTopicPartitions(subscription.ownedPartitions(), metadataImage),
24932515
records
24942516
);
24952517
}
@@ -4084,7 +4106,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMembers(
40844106

40854107
List<CoordinatorRecord> records = new ArrayList<>();
40864108
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
4087-
convertToClassicGroup(group, members, null, records);
4109+
convertToClassicGroup(group, members, null, false, records);
40884110
return new CoordinatorResult<>(records, response, null, false);
40894111
} else {
40904112
for (ConsumerGroupMember member : members) {

0 commit comments

Comments
 (0)