Skip to content

Commit ebae768

Browse files
authored
KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style (#19955)
In Kafka Streams, configuration classes typically follow a fluent API pattern to ensure a consistent and intuitive developer experience. However, the current implementation of `org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this convention by exposing a public constructor, breaking the uniformity expected across the API. To address this inconsistency, we propose introducing a new `CloseOptions` class that adheres to the fluent API style, replacing the existing implementation. The new class will retain the existing `timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce fluent instantiation and configuration. Given the design shift, we will not maintain backward compatibility with the current class. This change aligns with the goal of standardizing configuration objects across Kafka Streams, offering developers a more cohesive and predictable API. Reviewers: Bill Bejeck<[email protected]>
1 parent fa2496b commit ebae768

File tree

10 files changed

+334
-162
lines changed

10 files changed

+334
-162
lines changed

docs/upgrade.html

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,13 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
189189
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
190190
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
191191
</li>
192+
<li>
193+
Deprecated <code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related methods, such as
194+
<code>KafkaStreams#close(org.apache.kafka.streams.KafkaStreams$CloseOptions)</code>.
195+
As a replacement, please use <code>org.apache.kafka.streams.CloseOptions</code> and
196+
<code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
197+
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/QAq9F">KIP-1153</a>.
198+
</li>
192199
</ul>
193200

194201
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.apache.kafka.common.utils.MockTime;
2929
import org.apache.kafka.common.utils.Utils;
3030
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
31+
import org.apache.kafka.streams.CloseOptions;
3132
import org.apache.kafka.streams.KafkaStreams;
32-
import org.apache.kafka.streams.KafkaStreams.CloseOptions;
3333
import org.apache.kafka.streams.KeyValue;
3434
import org.apache.kafka.streams.StreamsBuilder;
3535
import org.apache.kafka.streams.StreamsConfig;
@@ -159,7 +159,7 @@ public void testCloseOptions() throws Exception {
159159
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
160160
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
161161

162-
streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
162+
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
163163
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
164164
}
165165

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams;
18+
19+
import java.time.Duration;
20+
import java.util.Objects;
21+
import java.util.Optional;
22+
23+
public class CloseOptions {
24+
/**
25+
* Enum to specify the group membership operation upon closing the Kafka Streams application.
26+
*
27+
* <ul>
28+
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
29+
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
30+
* </ul>
31+
*/
32+
public enum GroupMembershipOperation {
33+
LEAVE_GROUP,
34+
REMAIN_IN_GROUP
35+
}
36+
37+
/**
38+
* Specifies the group membership operation upon shutdown.
39+
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
40+
*/
41+
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;
42+
43+
/**
44+
* Specifies the maximum amount of time to wait for the close process to complete.
45+
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
46+
*/
47+
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
48+
49+
private CloseOptions() {
50+
}
51+
52+
protected CloseOptions(final CloseOptions closeOptions) {
53+
this.operation = closeOptions.operation;
54+
this.timeout = closeOptions.timeout;
55+
}
56+
57+
/**
58+
* Static method to create a {@code CloseOptions} with a custom timeout.
59+
*
60+
* @param timeout the maximum time to wait for the KafkaStreams to close.
61+
* @return a new {@code CloseOptions} instance with the specified timeout.
62+
*/
63+
public static CloseOptions timeout(final Duration timeout) {
64+
return new CloseOptions().withTimeout(timeout);
65+
}
66+
67+
/**
68+
* Static method to create a {@code CloseOptions} with a specified group membership operation.
69+
*
70+
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
71+
* @return a new {@code CloseOptions} instance with the specified group membership operation.
72+
*/
73+
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
74+
return new CloseOptions().withGroupMembershipOperation(operation);
75+
}
76+
77+
/**
78+
* Fluent method to set the timeout for the close process.
79+
*
80+
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
81+
* @return this {@code CloseOptions} instance.
82+
*/
83+
public CloseOptions withTimeout(final Duration timeout) {
84+
this.timeout = Optional.ofNullable(timeout);
85+
return this;
86+
}
87+
88+
/**
89+
* Fluent method to set the group membership operation upon shutdown.
90+
*
91+
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
92+
* @return this {@code CloseOptions} instance.
93+
*/
94+
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
95+
this.operation = Objects.requireNonNull(operation, "operation should not be null");
96+
return this;
97+
}
98+
}

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
5050
import org.apache.kafka.streams.errors.UnknownStateStoreException;
5151
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
52+
import org.apache.kafka.streams.internals.CloseOptionsInternal;
5253
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
5354
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
5455
import org.apache.kafka.streams.processor.StandbyUpdateListener;
@@ -488,7 +489,7 @@ private void replaceStreamThread(final Throwable throwable) {
488489
closeToError();
489490
}
490491
final StreamThread deadThread = (StreamThread) Thread.currentThread();
491-
deadThread.shutdown(false);
492+
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
492493
addStreamThread();
493494
if (throwable instanceof RuntimeException) {
494495
throw (RuntimeException) throwable;
@@ -765,7 +766,7 @@ private void throwOnFatalException(final Exception fatalUserException,
765766

766767
@Override
767768
public void onUpdateStart(final TopicPartition topicPartition,
768-
final String storeName,
769+
final String storeName,
769770
final long startingOffset) {
770771
if (userStandbyListener != null) {
771772
try {
@@ -1136,7 +1137,7 @@ public Optional<String> addStreamThread() {
11361137
return Optional.of(streamThread.getName());
11371138
} else {
11381139
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
1139-
streamThread.shutdown(true);
1140+
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
11401141
threads.remove(streamThread);
11411142
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
11421143
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
@@ -1200,7 +1201,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
12001201
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
12011202
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
12021203
log.info("Removing StreamThread {}", streamThread.getName());
1203-
streamThread.shutdown(true);
1204+
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
12041205
if (callingThreadIsNotCurrentStreamThread) {
12051206
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
12061207
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
@@ -1418,15 +1419,18 @@ public synchronized void start() throws IllegalStateException, StreamsException
14181419
/**
14191420
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
14201421
*/
1422+
@Deprecated(since = "4.2")
14211423
public static class CloseOptions {
14221424
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
14231425
private boolean leaveGroup = false;
14241426

1427+
@Deprecated(since = "4.2")
14251428
public CloseOptions timeout(final Duration timeout) {
14261429
this.timeout = timeout;
14271430
return this;
14281431
}
14291432

1433+
@Deprecated(since = "4.2")
14301434
public CloseOptions leaveGroup(final boolean leaveGroup) {
14311435
this.leaveGroup = leaveGroup;
14321436
return this;
@@ -1438,10 +1442,14 @@ public CloseOptions leaveGroup(final boolean leaveGroup) {
14381442
* This will block until all threads have stopped.
14391443
*/
14401444
public void close() {
1441-
close(Optional.empty(), false);
1445+
close(Optional.empty(), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
14421446
}
14431447

1444-
private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
1448+
private Thread shutdownHelper(
1449+
final boolean error,
1450+
final long timeoutMs,
1451+
final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation
1452+
) {
14451453
stateDirCleaner.shutdownNow();
14461454
if (rocksDBMetricsRecordingService != null) {
14471455
rocksDBMetricsRecordingService.shutdownNow();
@@ -1453,7 +1461,9 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b
14531461
return new Thread(() -> {
14541462
// notify all the threads to stop; avoid deadlocks by stopping any
14551463
// further state reports from the thread since we're shutting down
1456-
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
1464+
int numStreamThreads = processStreamThread(
1465+
streamThread -> streamThread.shutdown(operation)
1466+
);
14571467

14581468
log.info("Shutting down {} stream threads", numStreamThreads);
14591469

@@ -1513,7 +1523,7 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b
15131523
}, clientId + "-CloseThread");
15141524
}
15151525

1516-
private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
1526+
private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
15171527
final long timeoutMs;
15181528
if (timeout.isPresent()) {
15191529
timeoutMs = timeout.get();
@@ -1544,7 +1554,7 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
15441554
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
15451555
}
15461556

1547-
final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
1557+
final Thread shutdownThread = shutdownHelper(false, timeoutMs, operation);
15481558

15491559
shutdownThread.setDaemon(true);
15501560
shutdownThread.start();
@@ -1562,7 +1572,7 @@ private void closeToError() {
15621572
if (!setState(State.PENDING_ERROR)) {
15631573
log.info("Skipping shutdown since we are already in {}", state());
15641574
} else {
1565-
final Thread shutdownThread = shutdownHelper(true, -1, false);
1575+
final Thread shutdownThread = shutdownHelper(true, -1, org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
15661576

15671577
shutdownThread.setDaemon(true);
15681578
shutdownThread.start();
@@ -1588,28 +1598,50 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
15881598
throw new IllegalArgumentException("Timeout can't be negative.");
15891599
}
15901600

1591-
return close(Optional.of(timeoutMs), false);
1601+
return close(Optional.of(timeoutMs), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
15921602
}
15931603

15941604
/**
15951605
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
15961606
* threads to join.
1607+
* This method is deprecated and replaced by {@link #close(org.apache.kafka.streams.CloseOptions)}.
15971608
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
15981609
* trigger consumer leave call
15991610
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
16001611
* before all threads stopped
16011612
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
16021613
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
16031614
*/
1615+
@Deprecated(since = "4.2")
16041616
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
1617+
final org.apache.kafka.streams.CloseOptions closeOptions = org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
1618+
.withGroupMembershipOperation(options.leaveGroup ?
1619+
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
1620+
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
1621+
return close(closeOptions);
1622+
}
1623+
1624+
/**
1625+
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
1626+
* threads to join.
1627+
* @param options contains timeout to specify how long to wait for the threads to shut down,
1628+
* and a {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
1629+
* to trigger consumer leave call or remain in the group
1630+
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
1631+
* before all threads stopped
1632+
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
1633+
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
1634+
*/
1635+
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
16051636
Objects.requireNonNull(options, "options cannot be null");
1606-
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
1607-
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
1637+
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
1638+
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
1639+
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
16081640
if (timeoutMs < 0) {
16091641
throw new IllegalArgumentException("Timeout can't be negative.");
16101642
}
16111643

1612-
return close(Optional.of(timeoutMs), options.leaveGroup);
1644+
return close(Optional.of(timeoutMs), optionsInternal.operation());
16131645
}
16141646

16151647
/**
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.internals;
18+
19+
import org.apache.kafka.streams.CloseOptions;
20+
21+
import java.time.Duration;
22+
import java.util.Optional;
23+
24+
public class CloseOptionsInternal extends CloseOptions {
25+
26+
public CloseOptionsInternal(final CloseOptions options) {
27+
super(options);
28+
}
29+
30+
public GroupMembershipOperation operation() {
31+
return operation;
32+
}
33+
34+
public Optional<Duration> timeout() {
35+
return timeout;
36+
}
37+
}

0 commit comments

Comments
 (0)