Skip to content

Commit c48c50d

Browse files
KAFKA-19831: Improved error handling in DefaultStateUpdater. (#20767)
- Improved error handling in DefaultStateUpdater to take potential failures in Task#maybeCheckpoint into account. - Improved TaskManager#shutdownStateUpdater to not hang indefinitely if the State Updater thread is dead. Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy <[email protected]> --------- Co-authored-by: Matthias J. Sax <[email protected]>
1 parent 99fafc4 commit c48c50d

File tree

5 files changed

+377
-46
lines changed

5 files changed

+377
-46
lines changed
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.integration;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.common.serialization.Serdes;
21+
import org.apache.kafka.common.utils.MockTime;
22+
import org.apache.kafka.streams.KafkaStreams;
23+
import org.apache.kafka.streams.StreamsConfig;
24+
import org.apache.kafka.streams.TopologyWrapper;
25+
import org.apache.kafka.streams.errors.ProcessorStateException;
26+
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
27+
import org.apache.kafka.streams.processor.StateStore;
28+
import org.apache.kafka.streams.processor.StateStoreContext;
29+
import org.apache.kafka.streams.state.KeyValueStore;
30+
import org.apache.kafka.streams.state.StoreBuilder;
31+
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
32+
import org.apache.kafka.test.MockApiProcessorSupplier;
33+
import org.apache.kafka.test.MockKeyValueStore;
34+
import org.apache.kafka.test.TestUtils;
35+
36+
import org.junit.jupiter.api.AfterEach;
37+
import org.junit.jupiter.api.BeforeEach;
38+
import org.junit.jupiter.api.Test;
39+
import org.junit.jupiter.api.TestInfo;
40+
41+
import java.io.IOException;
42+
import java.time.Duration;
43+
import java.util.Properties;
44+
import java.util.concurrent.CountDownLatch;
45+
import java.util.concurrent.atomic.AtomicInteger;
46+
47+
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
48+
import static org.junit.jupiter.api.Assertions.assertTrue;
49+
50+
public class StateUpdaterFailureIntegrationTest {
51+
52+
private static final int NUM_BROKERS = 1;
53+
protected static final String INPUT_TOPIC_NAME = "input-topic";
54+
private static final int NUM_PARTITIONS = 6;
55+
56+
private final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
57+
58+
private Properties streamsConfiguration;
59+
private final MockTime mockTime = cluster.time;
60+
private KafkaStreams streams;
61+
62+
@BeforeEach
63+
public void before(final TestInfo testInfo) throws InterruptedException, IOException {
64+
cluster.start();
65+
cluster.createTopic(INPUT_TOPIC_NAME, NUM_PARTITIONS, 1);
66+
streamsConfiguration = new Properties();
67+
final String safeTestName = safeUniqueTestName(testInfo);
68+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
69+
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
70+
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
71+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
72+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
73+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
74+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
75+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
76+
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
77+
78+
}
79+
80+
@AfterEach
81+
public void after() {
82+
cluster.stop();
83+
if (streams != null) {
84+
streams.close(Duration.ofSeconds(30));
85+
}
86+
}
87+
88+
/**
89+
* The conditions that we need to meet:
90+
* <p><ul>
91+
* <li>We have an unhandled task in {@link org.apache.kafka.streams.processor.internals.DefaultStateUpdater}</li>
92+
* <li>StreamThread is not running, so {@link org.apache.kafka.streams.processor.internals.TaskManager#handleExceptionsFromStateUpdater} is not called anymore</li>
93+
* <li>The task throws exception in {@link org.apache.kafka.streams.processor.internals.Task#maybeCheckpoint(boolean)} while being processed by {@code DefaultStateUpdater}</li>
94+
* <li>{@link org.apache.kafka.streams.processor.internals.TaskManager#shutdownStateUpdater} tries to clean up all tasks that are left in the {@code DefaultStateUpdater}</li>
95+
* </ul><p>
96+
* If all conditions are met, {@code TaskManager} needs to be able to handle the failed task from the {@code DefaultStateUpdater} correctly and not hang.
97+
*/
98+
@Test
99+
public void correctlyHandleFlushErrorsDuringRebalance() throws Exception {
100+
final AtomicInteger numberOfStoreInits = new AtomicInteger();
101+
final CountDownLatch pendingShutdownLatch = new CountDownLatch(1);
102+
103+
final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new AbstractStoreBuilder<>("testStateStore", Serdes.Integer(), Serdes.ByteArray(), new MockTime()) {
104+
105+
@Override
106+
public KeyValueStore<Object, Object> build() {
107+
return new MockKeyValueStore(name, false) {
108+
109+
@Override
110+
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
111+
super.init(stateStoreContext, root);
112+
numberOfStoreInits.incrementAndGet();
113+
}
114+
115+
@Override
116+
public void flush() {
117+
// we want to throw the ProcessorStateException here only when the rebalance finished(we reassigned the 3 tasks from the removed thread to the existing thread)
118+
// we use waitForCondition to wait until the current state is PENDING_SHUTDOWN to make sure the Stream Thread will not handle the exception and we can get to in TaskManager#shutdownStateUpdater
119+
if (numberOfStoreInits.get() == 9) {
120+
try {
121+
pendingShutdownLatch.await();
122+
} catch (final InterruptedException e) {
123+
throw new RuntimeException(e);
124+
}
125+
throw new ProcessorStateException("flush");
126+
}
127+
}
128+
};
129+
}
130+
};
131+
132+
final TopologyWrapper topology = new TopologyWrapper();
133+
topology.addSource("ingest", INPUT_TOPIC_NAME);
134+
topology.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest");
135+
topology.addStateStore(storeBuilder, "my-processor");
136+
137+
streams = new KafkaStreams(topology, streamsConfiguration);
138+
streams.setStateListener((newState, oldState) -> {
139+
if (newState == KafkaStreams.State.PENDING_SHUTDOWN) {
140+
pendingShutdownLatch.countDown();
141+
}
142+
});
143+
streams.start();
144+
145+
TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, "Streams never reached RUNNING state");
146+
147+
streams.removeStreamThread();
148+
149+
// Before shutting down, we want the tasks to be reassigned
150+
TestUtils.waitForCondition(() -> numberOfStoreInits.get() == 9, "Streams never reinitialized the store enough times");
151+
152+
assertTrue(streams.close(Duration.ofSeconds(60)));
153+
}
154+
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -345,23 +345,26 @@ private void handleTaskCorruptedException(final TaskCorruptedException taskCorru
345345
// TODO: we can let the exception encode the actual corrupted changelog partitions and only
346346
// mark those instead of marking all changelogs
347347
private void removeCheckpointForCorruptedTask(final Task task) {
348-
task.markChangelogAsCorrupted(task.changelogPartitions());
348+
try {
349+
task.markChangelogAsCorrupted(task.changelogPartitions());
349350

350-
// we need to enforce a checkpoint that removes the corrupted partitions
351-
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
351+
// we need to enforce a checkpoint that removes the corrupted partitions
352+
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
353+
} catch (final StreamsException swallow) {
354+
log.warn("Checkpoint failed for corrupted task {}", task.id(), swallow);
355+
}
352356
}
353357

354358
private void handleStreamsException(final StreamsException streamsException) {
355359
log.info("Encountered streams exception: ", streamsException);
356360
if (streamsException.taskId().isPresent()) {
357-
handleStreamsExceptionWithTask(streamsException);
361+
handleStreamsExceptionWithTask(streamsException, streamsException.taskId().get());
358362
} else {
359363
handleStreamsExceptionWithoutTask(streamsException);
360364
}
361365
}
362366

363-
private void handleStreamsExceptionWithTask(final StreamsException streamsException) {
364-
final TaskId failedTaskId = streamsException.taskId().get();
367+
private void handleStreamsExceptionWithTask(final StreamsException streamsException, final TaskId failedTaskId) {
365368
if (updatingTasks.containsKey(failedTaskId)) {
366369
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(
367370
new ExceptionAndTask(streamsException, updatingTasks.get(failedTaskId))
@@ -514,7 +517,7 @@ private void removeTask(final TaskId taskId, final CompletableFuture<RemovedTask
514517
+ " own this task.", taskId);
515518
}
516519
} catch (final StreamsException streamsException) {
517-
handleStreamsException(streamsException);
520+
handleStreamsExceptionWithTask(streamsException, taskId);
518521
future.completeExceptionally(streamsException);
519522
} catch (final RuntimeException runtimeException) {
520523
handleRuntimeException(runtimeException);
@@ -606,14 +609,19 @@ private boolean removeFailedTask(final TaskId taskId, final CompletableFuture<Re
606609
private void pauseTask(final Task task) {
607610
final TaskId taskId = task.id();
608611
// do not need to unregister changelog partitions for paused tasks
609-
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
610-
pausedTasks.put(taskId, task);
611-
updatingTasks.remove(taskId);
612-
if (task.isActive()) {
613-
transitToUpdateStandbysIfOnlyStandbysLeft();
612+
try {
613+
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
614+
pausedTasks.put(taskId, task);
615+
updatingTasks.remove(taskId);
616+
if (task.isActive()) {
617+
transitToUpdateStandbysIfOnlyStandbysLeft();
618+
}
619+
log.info((task.isActive() ? "Active" : "Standby")
620+
+ " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
621+
622+
} catch (final StreamsException streamsException) {
623+
handleStreamsExceptionWithTask(streamsException, taskId);
614624
}
615-
log.info((task.isActive() ? "Active" : "Standby")
616-
+ " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
617625
}
618626

619627
private void resumeTask(final Task task) {
@@ -640,11 +648,15 @@ private void maybeCompleteRestoration(final StreamTask task,
640648
final Set<TopicPartition> restoredChangelogs) {
641649
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
642650
if (restoredChangelogs.containsAll(changelogPartitions)) {
643-
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
644-
changelogReader.unregister(changelogPartitions);
645-
addToRestoredTasks(task);
646-
log.info("Stateful active task " + task.id() + " completed restoration");
647-
transitToUpdateStandbysIfOnlyStandbysLeft();
651+
try {
652+
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
653+
changelogReader.unregister(changelogPartitions);
654+
addToRestoredTasks(task);
655+
log.info("Stateful active task " + task.id() + " completed restoration");
656+
transitToUpdateStandbysIfOnlyStandbysLeft();
657+
} catch (final StreamsException streamsException) {
658+
handleStreamsExceptionWithTask(streamsException, task.id());
659+
}
648660
}
649661
}
650662

@@ -676,8 +688,12 @@ private void maybeCheckpointTasks(final long now) {
676688

677689
measureCheckpointLatency(() -> {
678690
for (final Task task : updatingTasks.values()) {
679-
// do not enforce checkpointing during restoration if its position has not advanced much
680-
task.maybeCheckpoint(false);
691+
try {
692+
// do not enforce checkpointing during restoration if its position has not advanced much
693+
task.maybeCheckpoint(false);
694+
} catch (final StreamsException streamsException) {
695+
handleStreamsExceptionWithTask(streamsException, task.id());
696+
}
681697
}
682698
});
683699

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.TreeSet;
6666
import java.util.concurrent.CompletableFuture;
6767
import java.util.concurrent.ExecutionException;
68+
import java.util.concurrent.TimeUnit;
6869
import java.util.concurrent.atomic.AtomicReference;
6970
import java.util.stream.Collectors;
7071
import java.util.stream.Stream;
@@ -772,7 +773,7 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId,
772773
final CompletableFuture<StateUpdater.RemovedTaskResult> future) {
773774
final StateUpdater.RemovedTaskResult removedTaskResult;
774775
try {
775-
removedTaskResult = future.get();
776+
removedTaskResult = future.get(5, TimeUnit.MINUTES);
776777
if (removedTaskResult == null) {
777778
throw new IllegalStateException("Task " + taskId + " was not found in the state updater. "
778779
+ BUG_ERROR_MESSAGE);
@@ -787,6 +788,10 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId,
787788
Thread.currentThread().interrupt();
788789
log.error(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen);
789790
throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen);
791+
} catch (final java.util.concurrent.TimeoutException timeoutException) {
792+
log.warn("The state updater wasn't able to remove task {} in time. The state updater thread may be dead. "
793+
+ BUG_ERROR_MESSAGE, taskId, timeoutException);
794+
return null;
790795
}
791796
}
792797

@@ -1576,6 +1581,12 @@ void shutdown(final boolean clean) {
15761581

15771582
private void shutdownStateUpdater() {
15781583
if (stateUpdater != null) {
1584+
// If there are failed tasks handling them first
1585+
for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) {
1586+
final Task failedTask = exceptionAndTask.task();
1587+
closeTaskDirty(failedTask, false);
1588+
}
1589+
15791590
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
15801591
for (final Task task : stateUpdater.tasks()) {
15811592
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
@@ -1584,24 +1595,31 @@ private void shutdownStateUpdater() {
15841595
final Set<Task> tasksToCloseClean = new HashSet<>();
15851596
final Set<Task> tasksToCloseDirty = new HashSet<>();
15861597
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
1587-
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
1598+
// at this point we removed all tasks, so the shutdown should not take a lot of time
1599+
stateUpdater.shutdown(Duration.ofMinutes(1L));
15881600

15891601
for (final Task task : tasksToCloseClean) {
15901602
tasks.addTask(task);
15911603
}
15921604
for (final Task task : tasksToCloseDirty) {
15931605
closeTaskDirty(task, false);
15941606
}
1607+
// Handling all failures that occurred during the remove process
15951608
for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) {
15961609
final Task failedTask = exceptionAndTask.task();
15971610
closeTaskDirty(failedTask, false);
15981611
}
1612+
1613+
// If there is anything left unhandled due to timeouts, handling now
1614+
for (final Task task : stateUpdater.tasks()) {
1615+
closeTaskDirty(task, false);
1616+
}
15991617
}
16001618
}
16011619

16021620
private void shutdownSchedulingTaskManager() {
16031621
if (schedulingTaskManager != null) {
1604-
schedulingTaskManager.shutdown(Duration.ofMillis(Long.MAX_VALUE));
1622+
schedulingTaskManager.shutdown(Duration.ofMinutes(5L));
16051623
}
16061624
}
16071625

0 commit comments

Comments
 (0)