Skip to content

Commit 55bae7e

Browse files
KAFKA-19831: Improved error handling in DefaultStateUpdater. (apache#20767)
- Improved error handling in DefaultStateUpdater to take potential failures in Task#maybeCheckpoint into account. - Improved TaskManager#shutdownStateUpdater to not hang indefinitely if the State Updater thread is dead. Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy <[email protected]> --------- Co-authored-by: Matthias J. Sax <[email protected]>
1 parent 6b035a9 commit 55bae7e

File tree

5 files changed

+373
-42
lines changed

5 files changed

+373
-42
lines changed
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.integration;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.common.serialization.Serdes;
21+
import org.apache.kafka.common.utils.MockTime;
22+
import org.apache.kafka.streams.KafkaStreams;
23+
import org.apache.kafka.streams.StreamsConfig;
24+
import org.apache.kafka.streams.TopologyWrapper;
25+
import org.apache.kafka.streams.errors.ProcessorStateException;
26+
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
27+
import org.apache.kafka.streams.processor.StateStore;
28+
import org.apache.kafka.streams.processor.StateStoreContext;
29+
import org.apache.kafka.streams.state.KeyValueStore;
30+
import org.apache.kafka.streams.state.StoreBuilder;
31+
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
32+
import org.apache.kafka.test.MockApiProcessorSupplier;
33+
import org.apache.kafka.test.MockKeyValueStore;
34+
import org.apache.kafka.test.TestUtils;
35+
36+
import org.junit.jupiter.api.AfterEach;
37+
import org.junit.jupiter.api.BeforeEach;
38+
import org.junit.jupiter.api.Test;
39+
import org.junit.jupiter.api.TestInfo;
40+
41+
import java.io.IOException;
42+
import java.time.Duration;
43+
import java.util.Properties;
44+
import java.util.concurrent.CountDownLatch;
45+
import java.util.concurrent.atomic.AtomicInteger;
46+
47+
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
48+
import static org.junit.jupiter.api.Assertions.assertTrue;
49+
50+
public class StateUpdaterFailureIntegrationTest {
51+
52+
private static final int NUM_BROKERS = 1;
53+
protected static final String INPUT_TOPIC_NAME = "input-topic";
54+
private static final int NUM_PARTITIONS = 6;
55+
56+
private final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
57+
58+
private Properties streamsConfiguration;
59+
private final MockTime mockTime = cluster.time;
60+
private KafkaStreams streams;
61+
62+
@BeforeEach
63+
public void before(final TestInfo testInfo) throws InterruptedException, IOException {
64+
cluster.start();
65+
cluster.createTopic(INPUT_TOPIC_NAME, NUM_PARTITIONS, 1);
66+
streamsConfiguration = new Properties();
67+
final String safeTestName = safeUniqueTestName(testInfo);
68+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
69+
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
70+
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
71+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
72+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
73+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
74+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
75+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
76+
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
77+
78+
}
79+
80+
@AfterEach
81+
public void after() {
82+
cluster.stop();
83+
if (streams != null) {
84+
streams.close(Duration.ofSeconds(30));
85+
}
86+
}
87+
88+
/**
89+
* The conditions that we need to meet:
90+
* <p><ul>
91+
* <li>We have an unhandled task in {@link org.apache.kafka.streams.processor.internals.DefaultStateUpdater}</li>
92+
* <li>StreamThread is not running, so {@link org.apache.kafka.streams.processor.internals.TaskManager#handleExceptionsFromStateUpdater} is not called anymore</li>
93+
* <li>The task throws exception in {@link org.apache.kafka.streams.processor.internals.Task#maybeCheckpoint(boolean)} while being processed by {@code DefaultStateUpdater}</li>
94+
* <li>{@link org.apache.kafka.streams.processor.internals.TaskManager#shutdownStateUpdater} tries to clean up all tasks that are left in the {@code DefaultStateUpdater}</li>
95+
* </ul><p>
96+
* If all conditions are met, {@code TaskManager} needs to be able to handle the failed task from the {@code DefaultStateUpdater} correctly and not hang.
97+
*/
98+
@Test
99+
public void correctlyHandleFlushErrorsDuringRebalance() throws Exception {
100+
final AtomicInteger numberOfStoreInits = new AtomicInteger();
101+
final CountDownLatch pendingShutdownLatch = new CountDownLatch(1);
102+
103+
final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new AbstractStoreBuilder<>("testStateStore", Serdes.Integer(), Serdes.ByteArray(), new MockTime()) {
104+
105+
@Override
106+
public KeyValueStore<Object, Object> build() {
107+
return new MockKeyValueStore(name, false) {
108+
109+
@Override
110+
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
111+
super.init(stateStoreContext, root);
112+
numberOfStoreInits.incrementAndGet();
113+
}
114+
115+
@Override
116+
public void flush() {
117+
// we want to throw the ProcessorStateException here only when the rebalance finished(we reassigned the 3 tasks from the removed thread to the existing thread)
118+
// we use waitForCondition to wait until the current state is PENDING_SHUTDOWN to make sure the Stream Thread will not handle the exception and we can get to in TaskManager#shutdownStateUpdater
119+
if (numberOfStoreInits.get() == 9) {
120+
try {
121+
pendingShutdownLatch.await();
122+
} catch (final InterruptedException e) {
123+
throw new RuntimeException(e);
124+
}
125+
throw new ProcessorStateException("flush");
126+
}
127+
}
128+
};
129+
}
130+
};
131+
132+
final TopologyWrapper topology = new TopologyWrapper();
133+
topology.addSource("ingest", INPUT_TOPIC_NAME);
134+
topology.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest");
135+
topology.addStateStore(storeBuilder, "my-processor");
136+
137+
streams = new KafkaStreams(topology, streamsConfiguration);
138+
streams.setStateListener((newState, oldState) -> {
139+
if (newState == KafkaStreams.State.PENDING_SHUTDOWN) {
140+
pendingShutdownLatch.countDown();
141+
}
142+
});
143+
streams.start();
144+
145+
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, "Streams never reached RUNNING state");
146+
147+
streams.removeStreamThread();
148+
149+
// Before shutting down, we want the tasks to be reassigned
150+
TestUtils.waitForCondition(() -> numberOfStoreInits.get() == 9, "Streams never reinitialized the store enough times");
151+
152+
assertTrue(streams.close(Duration.ofSeconds(60)));
153+
}
154+
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -349,23 +349,26 @@ private void handleTaskCorruptedException(final TaskCorruptedException taskCorru
349349
// TODO: we can let the exception encode the actual corrupted changelog partitions and only
350350
// mark those instead of marking all changelogs
351351
private void removeCheckpointForCorruptedTask(final Task task) {
352-
task.markChangelogAsCorrupted(task.changelogPartitions());
352+
try {
353+
task.markChangelogAsCorrupted(task.changelogPartitions());
353354

354-
// we need to enforce a checkpoint that removes the corrupted partitions
355-
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
355+
// we need to enforce a checkpoint that removes the corrupted partitions
356+
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
357+
} catch (final StreamsException swallow) {
358+
log.warn("Checkpoint failed for corrupted task {}", task.id(), swallow);
359+
}
356360
}
357361

358362
private void handleStreamsException(final StreamsException streamsException) {
359363
log.info("Encountered streams exception: ", streamsException);
360364
if (streamsException.taskId().isPresent()) {
361-
handleStreamsExceptionWithTask(streamsException);
365+
handleStreamsExceptionWithTask(streamsException, streamsException.taskId().get());
362366
} else {
363367
handleStreamsExceptionWithoutTask(streamsException);
364368
}
365369
}
366370

367-
private void handleStreamsExceptionWithTask(final StreamsException streamsException) {
368-
final TaskId failedTaskId = streamsException.taskId().get();
371+
private void handleStreamsExceptionWithTask(final StreamsException streamsException, final TaskId failedTaskId) {
369372
if (updatingTasks.containsKey(failedTaskId)) {
370373
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(
371374
new ExceptionAndTask(streamsException, updatingTasks.get(failedTaskId))
@@ -518,7 +521,7 @@ private void removeTask(final TaskId taskId, final CompletableFuture<RemovedTask
518521
+ " own this task.", taskId);
519522
}
520523
} catch (final StreamsException streamsException) {
521-
handleStreamsException(streamsException);
524+
handleStreamsExceptionWithTask(streamsException, taskId);
522525
future.completeExceptionally(streamsException);
523526
} catch (final RuntimeException runtimeException) {
524527
handleRuntimeException(runtimeException);
@@ -637,14 +640,19 @@ private void removeTask(final TaskId taskId) {
637640
private void pauseTask(final Task task) {
638641
final TaskId taskId = task.id();
639642
// do not need to unregister changelog partitions for paused tasks
640-
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
641-
pausedTasks.put(taskId, task);
642-
updatingTasks.remove(taskId);
643-
if (task.isActive()) {
644-
transitToUpdateStandbysIfOnlyStandbysLeft();
643+
try {
644+
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
645+
pausedTasks.put(taskId, task);
646+
updatingTasks.remove(taskId);
647+
if (task.isActive()) {
648+
transitToUpdateStandbysIfOnlyStandbysLeft();
649+
}
650+
log.info((task.isActive() ? "Active" : "Standby")
651+
+ " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
652+
653+
} catch (final StreamsException streamsException) {
654+
handleStreamsExceptionWithTask(streamsException, taskId);
645655
}
646-
log.info((task.isActive() ? "Active" : "Standby")
647-
+ " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
648656
}
649657

650658
private void resumeTask(final Task task) {
@@ -671,11 +679,15 @@ private void maybeCompleteRestoration(final StreamTask task,
671679
final Set<TopicPartition> restoredChangelogs) {
672680
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
673681
if (restoredChangelogs.containsAll(changelogPartitions)) {
674-
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
675-
changelogReader.unregister(changelogPartitions);
676-
addToRestoredTasks(task);
677-
log.info("Stateful active task " + task.id() + " completed restoration");
678-
transitToUpdateStandbysIfOnlyStandbysLeft();
682+
try {
683+
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
684+
changelogReader.unregister(changelogPartitions);
685+
addToRestoredTasks(task);
686+
log.info("Stateful active task " + task.id() + " completed restoration");
687+
transitToUpdateStandbysIfOnlyStandbysLeft();
688+
} catch (final StreamsException streamsException) {
689+
handleStreamsExceptionWithTask(streamsException, task.id());
690+
}
679691
}
680692
}
681693

@@ -707,8 +719,12 @@ private void maybeCheckpointTasks(final long now) {
707719

708720
measureCheckpointLatency(() -> {
709721
for (final Task task : updatingTasks.values()) {
710-
// do not enforce checkpointing during restoration if its position has not advanced much
711-
task.maybeCheckpoint(false);
722+
try {
723+
// do not enforce checkpointing during restoration if its position has not advanced much
724+
task.maybeCheckpoint(false);
725+
} catch (final StreamsException streamsException) {
726+
handleStreamsExceptionWithTask(streamsException, task.id());
727+
}
712728
}
713729
});
714730

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.TreeSet;
6666
import java.util.concurrent.CompletableFuture;
6767
import java.util.concurrent.ExecutionException;
68+
import java.util.concurrent.TimeUnit;
6869
import java.util.concurrent.atomic.AtomicReference;
6970
import java.util.stream.Collectors;
7071
import java.util.stream.Stream;
@@ -704,7 +705,7 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId,
704705
final CompletableFuture<StateUpdater.RemovedTaskResult> future) {
705706
final StateUpdater.RemovedTaskResult removedTaskResult;
706707
try {
707-
removedTaskResult = future.get();
708+
removedTaskResult = future.get(5, TimeUnit.MINUTES);
708709
if (removedTaskResult == null) {
709710
throw new IllegalStateException("Task " + taskId + " was not found in the state updater. "
710711
+ BUG_ERROR_MESSAGE);
@@ -719,6 +720,10 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId,
719720
Thread.currentThread().interrupt();
720721
log.error(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen);
721722
throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen);
723+
} catch (final java.util.concurrent.TimeoutException timeoutException) {
724+
log.warn("The state updater wasn't able to remove task {} in time. The state updater thread may be dead. "
725+
+ BUG_ERROR_MESSAGE, taskId, timeoutException);
726+
return null;
722727
}
723728
}
724729

@@ -1499,6 +1504,12 @@ void shutdown(final boolean clean) {
14991504

15001505
private void shutdownStateUpdater() {
15011506
if (stateUpdater != null) {
1507+
// If there are failed tasks handling them first
1508+
for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) {
1509+
final Task failedTask = exceptionAndTask.task();
1510+
closeTaskDirty(failedTask, false);
1511+
}
1512+
15021513
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
15031514
for (final Task task : stateUpdater.tasks()) {
15041515
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
@@ -1507,24 +1518,31 @@ private void shutdownStateUpdater() {
15071518
final Set<Task> tasksToCloseClean = new HashSet<>();
15081519
final Set<Task> tasksToCloseDirty = new HashSet<>();
15091520
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
1510-
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
1521+
// at this point we removed all tasks, so the shutdown should not take a lot of time
1522+
stateUpdater.shutdown(Duration.ofMinutes(1L));
15111523

15121524
for (final Task task : tasksToCloseClean) {
15131525
tasks.addTask(task);
15141526
}
15151527
for (final Task task : tasksToCloseDirty) {
15161528
closeTaskDirty(task, false);
15171529
}
1530+
// Handling all failures that occurred during the remove process
15181531
for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) {
15191532
final Task failedTask = exceptionAndTask.task();
15201533
closeTaskDirty(failedTask, false);
15211534
}
1535+
1536+
// If there is anything left unhandled due to timeouts, handling now
1537+
for (final Task task : stateUpdater.tasks()) {
1538+
closeTaskDirty(task, false);
1539+
}
15221540
}
15231541
}
15241542

15251543
private void shutdownSchedulingTaskManager() {
15261544
if (schedulingTaskManager != null) {
1527-
schedulingTaskManager.shutdown(Duration.ofMillis(Long.MAX_VALUE));
1545+
schedulingTaskManager.shutdown(Duration.ofMinutes(5L));
15281546
}
15291547
}
15301548

0 commit comments

Comments
 (0)