Skip to content

Commit 7ba7f5e

Browse files
authored
KAFKA-19546: Rebalance should be triggered by subscription change during group protocol downgrade (apache#20580)
Cherry pick KAFKA-19546 to 4.0. During online downgrade, when a static member using the consumer protocol which is also the last member using the consumer protocol is replaced by another static member using the classic protocol with the same instance id, the latter will take the assignment of the former and an online downgrade will be triggered. In the current implementation, if the replacing static member has a different subscription, no rebalance will be triggered when the downgrade happens. The patch checks whether the static member has changed subscription and triggers a rebalance when it does. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent b31ce61 commit 7ba7f5e

File tree

2 files changed

+140
-90
lines changed

2 files changed

+140
-90
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,16 +1006,19 @@ private boolean validateOnlineDowngradeWithReplacedMember(
10061006
/**
10071007
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
10081008
*
1009-
* @param consumerGroup The converted ConsumerGroup.
1010-
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
1011-
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
1012-
* When not null, must have an instanceId that matches an existing member.
1013-
* @param records The record list to which the conversion records are added.
1009+
* @param consumerGroup The converted ConsumerGroup.
1010+
* @param leavingMembers The leaving member(s) that triggered the downgrade validation.
1011+
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
1012+
* When not null, must have an instanceId that matches the replaced member.
1013+
* @param hasSubscriptionChanged The boolean indicating whether the joining member has a different subscription
1014+
* from the replaced member. Only used when joiningMember is set.
1015+
* @param records The record list to which the conversion records are added.
10141016
*/
10151017
private void convertToClassicGroup(
10161018
ConsumerGroup consumerGroup,
10171019
Set<ConsumerGroupMember> leavingMembers,
10181020
ConsumerGroupMember joiningMember,
1021+
boolean hasSubscriptionChanged,
10191022
List<CoordinatorRecord> records
10201023
) {
10211024
if (joiningMember == null) {
@@ -1056,9 +1059,12 @@ private void convertToClassicGroup(
10561059

10571060
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
10581061

1059-
// If the downgrade is triggered by a member leaving the group, a rebalance should be triggered.
1062+
// If the downgrade is triggered by a member leaving the group or a static
1063+
// member replacement with a different subscription, a rebalance should be triggered.
10601064
if (joiningMember == null) {
1061-
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
1065+
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for member leaving.", classicGroup.groupId()));
1066+
} else if (hasSubscriptionChanged) {
1067+
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic for static member replacement with different subscription.", classicGroup.groupId()));
10621068
}
10631069

10641070
log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId());
@@ -1883,6 +1889,10 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
18831889
);
18841890
}
18851891

1892+
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
1893+
boolean downgrade = existingStaticMemberOrNull != null &&
1894+
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
1895+
18861896
int groupEpoch = group.groupEpoch();
18871897
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
18881898
SubscriptionType subscriptionType = group.subscriptionType();
@@ -1931,50 +1941,62 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
19311941
subscriptionType = result.subscriptionType;
19321942
}
19331943

1934-
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
1935-
// the existing and the new target assignment is persisted to the partition.
1936-
final int targetAssignmentEpoch;
1937-
final Assignment targetAssignment;
1944+
if (downgrade) {
1945+
// 2. If the static member subscription hasn't changed, reconcile the member's assignment with the existing
1946+
// assignment if the member is not fully reconciled yet. If the static member subscription has changed, a
1947+
// rebalance will be triggered during downgrade anyway so we can skip the reconciliation.
1948+
if (!bumpGroupEpoch) {
1949+
updatedMember = maybeReconcile(
1950+
groupId,
1951+
updatedMember,
1952+
group::currentPartitionEpoch,
1953+
group.assignmentEpoch(),
1954+
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId()),
1955+
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
1956+
records
1957+
);
1958+
}
19381959

1939-
if (groupEpoch > group.assignmentEpoch()) {
1940-
targetAssignment = updateTargetAssignment(
1960+
// 3. Downgrade the consumer group.
1961+
convertToClassicGroup(
19411962
group,
1942-
groupEpoch,
1943-
member,
1963+
Collections.emptySet(),
19441964
updatedMember,
1945-
subscriptionMetadata,
1946-
subscriptionType,
1965+
bumpGroupEpoch,
19471966
records
19481967
);
1949-
targetAssignmentEpoch = groupEpoch;
19501968
} else {
1951-
targetAssignmentEpoch = group.assignmentEpoch();
1952-
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
1969+
// If no downgrade is triggered.
19531970

1954-
}
1971+
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch.
1972+
// The delta between the existing and the new target assignment is persisted to the partition.
1973+
final int targetAssignmentEpoch;
1974+
final Assignment targetAssignment;
19551975

1956-
// 3. Reconcile the member's assignment with the target assignment if the member is not
1957-
// fully reconciled yet.
1958-
updatedMember = maybeReconcile(
1959-
groupId,
1960-
updatedMember,
1961-
group::currentPartitionEpoch,
1962-
targetAssignmentEpoch,
1963-
targetAssignment,
1964-
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
1965-
records
1966-
);
1976+
if (groupEpoch > group.assignmentEpoch()) {
1977+
targetAssignment = updateTargetAssignment(
1978+
group,
1979+
groupEpoch,
1980+
member,
1981+
updatedMember,
1982+
subscriptionMetadata,
1983+
subscriptionType,
1984+
records
1985+
);
1986+
targetAssignmentEpoch = groupEpoch;
1987+
} else {
1988+
targetAssignmentEpoch = group.assignmentEpoch();
1989+
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
1990+
}
19671991

1968-
// 4. Maybe downgrade the consumer group if the last static member using the
1969-
// consumer protocol is replaced by the joining static member.
1970-
ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
1971-
boolean downgrade = existingStaticMemberOrNull != null &&
1972-
validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
1973-
if (downgrade) {
1974-
convertToClassicGroup(
1975-
group,
1976-
Collections.emptySet(),
1992+
// 3. Reconcile the member's assignment with the target assignment if the member is not fully reconciled yet.
1993+
updatedMember = maybeReconcile(
1994+
groupId,
19771995
updatedMember,
1996+
group::currentPartitionEpoch,
1997+
targetAssignmentEpoch,
1998+
targetAssignment,
1999+
toTopicPartitions(subscription.ownedPartitions(), metadataImage.topics()),
19782000
records
19792001
);
19802002
}
@@ -3097,7 +3119,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMembers(
30973119

30983120
List<CoordinatorRecord> records = new ArrayList<>();
30993121
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
3100-
convertToClassicGroup(group, members, null, records);
3122+
convertToClassicGroup(group, members, null, false, records);
31013123
return new CoordinatorResult<>(records, response, null, false);
31023124
} else {
31033125
for (ConsumerGroupMember member : members) {

0 commit comments

Comments
 (0)