Skip to content

Commit 68f1da8

Browse files
authored
KAFKA-18185: remove internal.leave.group.on.close config (#19400)
JIRA: KAFKA-18185 This is a follow-up of #17614 The patch is to remove the `internal.leave.group.on.close` config. Reviewers: Sophie Blee-Goldman <[email protected]>, Chia-Ping Tsai <[email protected]>, Bill Bejeck <[email protected]>
1 parent 28e7803 commit 68f1da8

File tree

18 files changed

+62
-192
lines changed

18 files changed

+62
-192
lines changed

clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public String toString() {
4646
public final Optional<String> rackId;
4747
public final long retryBackoffMs;
4848
public final long retryBackoffMaxMs;
49-
public final boolean leaveGroupOnClose;
5049

5150
public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {
5251
this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG);
@@ -80,13 +79,6 @@ public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {
8079

8180
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
8281
this.retryBackoffMaxMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG);
83-
84-
// Internal leave group config is only defined in Consumer.
85-
if (protocolType == ProtocolType.CONSUMER) {
86-
this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close");
87-
} else {
88-
this.leaveGroupOnClose = true;
89-
}
9082
}
9183

9284
// For testing purpose.
@@ -97,8 +89,7 @@ public GroupRebalanceConfig(final int sessionTimeoutMs,
9789
Optional<String> groupInstanceId,
9890
String rackId,
9991
long retryBackoffMs,
100-
long retryBackoffMaxMs,
101-
boolean leaveGroupOnClose) {
92+
long retryBackoffMaxMs) {
10293
this.sessionTimeoutMs = sessionTimeoutMs;
10394
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
10495
this.heartbeatIntervalMs = heartbeatIntervalMs;
@@ -107,6 +98,5 @@ public GroupRebalanceConfig(final int sessionTimeoutMs,
10798
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
10899
this.retryBackoffMs = retryBackoffMs;
109100
this.retryBackoffMaxMs = retryBackoffMaxMs;
110-
this.leaveGroupOnClose = leaveGroupOnClose;
111101
}
112102
}

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -330,17 +330,6 @@ public class ConsumerConfig extends AbstractConfig {
330330
"be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
331331
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
332332

333-
/**
334-
* <code>internal.leave.group.on.close</code>
335-
* Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
336-
* won't occur until <code>session.timeout.ms</code> expires.
337-
*
338-
* <p>
339-
* Note: this is an internal configuration and could be changed in the future in a backward incompatible way
340-
*
341-
*/
342-
static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
343-
344333
/**
345334
* <code>internal.throw.on.fetch.stable.offset.unsupported</code>
346335
* Whether or not the consumer should throw when the new stable offset feature is supported.
@@ -634,10 +623,6 @@ public class ConsumerConfig extends AbstractConfig {
634623
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
635624
Importance.MEDIUM,
636625
EXCLUDE_INTERNAL_TOPICS_DOC)
637-
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
638-
Type.BOOLEAN,
639-
true,
640-
Importance.LOW)
641626
.defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,
642627
Type.BOOLEAN,
643628
false,

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@ protected void handlePollTimeoutExpiry() {
11701170
public synchronized RequestFuture<Void> maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) {
11711171
RequestFuture<Void> future = null;
11721172

1173-
if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) {
1173+
if (shouldSendLeaveGroupRequest(membershipOperation)) {
11741174
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
11751175
generation.memberId, coordinator, leaveReason);
11761176
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
331331
groupInstanceId,
332332
rackId,
333333
retryBackoffMs,
334-
retryBackoffMaxMs,
335-
true
334+
retryBackoffMaxMs
336335
);
337336
this.coordinator = new ConsumerCoordinator(
338337
rebalanceConfig,

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ private void logPausedPartitionsBeingRevoked(Set<TopicPartition> partitionsToRev
421421
@Override
422422
public boolean isLeavingGroup() {
423423
CloseOptions.GroupMembershipOperation leaveGroupOperation = leaveGroupOperation();
424-
if (REMAIN_IN_GROUP == leaveGroupOperation) {
424+
if (REMAIN_IN_GROUP == leaveGroupOperation && groupInstanceId.isEmpty()) {
425425
return false;
426426
}
427427

@@ -432,7 +432,8 @@ public boolean isLeavingGroup() {
432432
boolean hasLeaveOperation = DEFAULT == leaveGroupOperation ||
433433
// Leave operation: both static and dynamic consumers will send a leave heartbeat
434434
LEAVE_GROUP == leaveGroupOperation ||
435-
// Remain in group: only static consumers will send a leave heartbeat, while dynamic members will not
435+
// Remain in group: static consumers will send a leave heartbeat with -2 epoch to reflect that a member using the given
436+
// instance id decided to leave the group and would be back within the session timeout.
436437
groupInstanceId().isPresent();
437438

438439
return isLeavingState && hasLeaveOperation;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,7 @@ private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs) {
135135
Optional.empty(), Optional.empty());
136136
}
137137

138-
139138
private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
140-
setupCoordinator(retryBackoffMs, retryBackoffMaxMs, rebalanceTimeoutMs, groupInstanceId, heartbeatThreadSupplier, groupInstanceId.isEmpty());
141-
}
142-
143-
private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional<String> groupInstanceId, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean leaveOnClose) {
144139
LogContext logContext = new LogContext();
145140
this.mockTime = new MockTime();
146141
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, retryBackoffMaxMs, 60 * 60 * 1000L,
@@ -168,8 +163,7 @@ false, false, new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST
168163
groupInstanceId,
169164
null,
170165
retryBackoffMs,
171-
retryBackoffMaxMs,
172-
leaveOnClose);
166+
retryBackoffMaxMs);
173167
this.coordinator = new DummyCoordinator(rebalanceConfig,
174168
consumerClient,
175169
metrics,
@@ -1109,7 +1103,7 @@ public void testLeaveGroupSentWithGroupInstanceIdUnSet() {
11091103
@ParameterizedTest
11101104
@MethodSource("groupInstanceIdAndMembershipOperationMatrix")
11111105
public void testLeaveGroupSentWithGroupInstanceIdUnSetAndDifferentGroupMembershipOperation(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
1112-
checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty(), true);
1106+
checkLeaveGroupRequestSent(groupInstanceId, operation, Optional.empty());
11131107
}
11141108

11151109
private static Stream<Arguments> groupInstanceIdAndMembershipOperationMatrix() {
@@ -1124,11 +1118,11 @@ private static Stream<Arguments> groupInstanceIdAndMembershipOperationMatrix() {
11241118
}
11251119

11261120
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
1127-
checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty(), groupInstanceId.isEmpty());
1121+
checkLeaveGroupRequestSent(groupInstanceId, CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty());
11281122
}
11291123

1130-
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean leaveOnClose) {
1131-
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier, leaveOnClose);
1124+
private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation, Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
1125+
setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier);
11321126

11331127
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
11341128
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,7 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstance
224224
groupInstanceId,
225225
rackId,
226226
retryBackoffMs,
227-
retryBackoffMaxMs,
228-
groupInstanceId.isEmpty());
227+
retryBackoffMaxMs);
229228
}
230229

231230
@AfterEach

clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public void setUp() {
4747
Optional.empty(),
4848
null,
4949
retryBackoffMs,
50-
retryBackoffMaxMs,
51-
true);
50+
retryBackoffMaxMs);
5251
heartbeat = new Heartbeat(rebalanceConfig, time);
5352
}
5453

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ public void init(ConnectProtocolCompatibility compatibility) {
149149
Optional.empty(),
150150
null,
151151
retryBackoffMs,
152-
retryBackoffMaxMs,
153-
true);
152+
retryBackoffMaxMs);
154153
this.coordinator = new WorkerCoordinator(rebalanceConfig,
155154
loggerFactory,
156155
consumerClient,

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,7 @@ public void setup(ConnectProtocolCompatibility compatibility) {
140140
Optional.empty(),
141141
null,
142142
retryBackoffMs,
143-
retryBackoffMaxMs,
144-
true);
143+
retryBackoffMaxMs);
145144
this.coordinator = new WorkerCoordinator(rebalanceConfig,
146145
logContext,
147146
consumerClient,

0 commit comments

Comments
 (0)