Skip to content

Commit 88ada9d

Browse files
fix silent failures in dispatch loop from stalling the pipeline (#32922)
* use ExecutorService instead of ScheduledExecutorService which swallows exceptions into futures that were not examined Co-authored-by: Arun Pandian <[email protected]>
1 parent 8b1ca21 commit 88ada9d

File tree

10 files changed

+136
-20
lines changed

10 files changed

+136
-20
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ public static <T extends DataflowWorkerHarnessOptions> T initializeGlobalStateAn
8282

8383
@SuppressWarnings("Slf4jIllegalPassedClass")
8484
public static void initializeLogging(Class<?> workerHarnessClass) {
85-
/* Set up exception handling tied to the workerHarnessClass. */
85+
// Set up exception handling for raw Threads tied to the workerHarnessClass.
86+
// Does NOT handle exceptions thrown by threads created by
87+
// ScheduledExecutors/ScheduledExecutorServices.
8688
Thread.setDefaultUncaughtExceptionHandler(
8789
new WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass)));
8890

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private StreamingDataflowWorker(
175175
StreamingCounters streamingCounters,
176176
MemoryMonitor memoryMonitor,
177177
GrpcWindmillStreamFactory windmillStreamFactory,
178-
Function<String, ScheduledExecutorService> executorSupplier,
178+
ScheduledExecutorService activeWorkRefreshExecutorFn,
179179
ConcurrentMap<String, StageInfo> stageInfoMap) {
180180
// Register standard file systems.
181181
FileSystems.setDefaultPipelineOptions(options);
@@ -285,7 +285,7 @@ private StreamingDataflowWorker(
285285
stuckCommitDurationMillis,
286286
computationStateCache::getAllPresentComputations,
287287
sampler,
288-
executorSupplier.apply("RefreshWork"),
288+
activeWorkRefreshExecutorFn,
289289
getDataMetricTracker::trackHeartbeats);
290290

291291
this.statusPages =
@@ -347,10 +347,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
347347
.setSizeMb(options.getWorkerCacheMb())
348348
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
349349
.build();
350-
Function<String, ScheduledExecutorService> executorSupplier =
351-
threadName ->
352-
Executors.newSingleThreadScheduledExecutor(
353-
new ThreadFactoryBuilder().setNameFormat(threadName).build());
350+
354351
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
355352
createGrpcwindmillStreamFactoryBuilder(options, clientId);
356353

@@ -417,7 +414,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
417414
streamingCounters,
418415
memoryMonitor,
419416
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
420-
executorSupplier,
417+
Executors.newSingleThreadScheduledExecutor(
418+
new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
421419
stageInfo);
422420
}
423421

@@ -595,7 +593,7 @@ static StreamingDataflowWorker forTesting(
595593
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
596594
.build()
597595
: windmillStreamFactory.build(),
598-
executorSupplier,
596+
executorSupplier.apply("RefreshWork"),
599597
stageInfo);
600598
}
601599

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@
2828
* This uncaught exception handler logs the {@link Throwable} to the logger, {@link System#err} and
2929
* exits the application with status code 1.
3030
*/
31-
class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
31+
public final class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
32+
@VisibleForTesting public static final int JVM_TERMINATED_STATUS_CODE = 1;
3233
private final JvmRuntime runtime;
3334
private final Logger logger;
3435

35-
WorkerUncaughtExceptionHandler(Logger logger) {
36+
public WorkerUncaughtExceptionHandler(Logger logger) {
3637
this(JvmRuntime.INSTANCE, logger);
3738
}
3839

39-
@VisibleForTesting
40-
WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
40+
public WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
4141
this.runtime = runtime;
4242
this.logger = logger;
4343
}
@@ -59,7 +59,7 @@ public void uncaughtException(Thread thread, Throwable e) {
5959
t.printStackTrace(originalStdErr);
6060
}
6161
} finally {
62-
runtime.halt(1);
62+
runtime.halt(JVM_TERMINATED_STATUS_CODE);
6363
}
6464
}
6565
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private FanOutStreamingEngineWorkerHarness(
137137
Executors.newCachedThreadPool(
138138
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
139139
this.workerMetadataConsumer =
140-
Executors.newSingleThreadScheduledExecutor(
140+
Executors.newSingleThreadExecutor(
141141
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
142142
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
143143
this.totalGetWorkBudget = totalGetWorkBudget;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
8282
this.waitForResources = waitForResources;
8383
this.computationStateFetcher = computationStateFetcher;
8484
this.workProviderExecutor =
85-
Executors.newSingleThreadScheduledExecutor(
85+
Executors.newSingleThreadExecutor(
8686
new ThreadFactoryBuilder()
8787
.setDaemon(true)
8888
.setPriority(Thread.MIN_PRIORITY)

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private StreamingApplianceWorkCommitter(
5757
WeightedBoundedQueue.create(
5858
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()));
5959
this.commitWorkers =
60-
Executors.newSingleThreadScheduledExecutor(
60+
Executors.newSingleThreadExecutor(
6161
new ThreadFactoryBuilder()
6262
.setDaemon(true)
6363
.setPriority(Thread.MAX_PRIORITY)

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public GetWorkBudgetRefresher(
5151
Supplier<Boolean> isBudgetRefreshPaused, Runnable redistributeBudget) {
5252
this.budgetRefreshTrigger = new AdvancingPhaser(1);
5353
this.budgetRefreshExecutor =
54-
Executors.newSingleThreadScheduledExecutor(
54+
Executors.newSingleThreadExecutor(
5555
new ThreadFactoryBuilder()
5656
.setNameFormat(BUDGET_REFRESH_THREAD)
5757
.setUncaughtExceptionHandler(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
*/
7171
@Internal
7272
@ThreadSafe
73-
public final class StreamingWorkScheduler {
73+
public class StreamingWorkScheduler {
7474
private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkScheduler.class);
7575

7676
private final DataflowWorkerHarnessOptions options;

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747

4848
@RunWith(JUnit4.class)
4949
public class StreamingEngineComputationConfigFetcherTest {
50-
5150
private final WorkUnitClient mockDataflowServiceClient =
5251
mock(WorkUnitClient.class, new Returns(Optional.empty()));
5352
private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.streaming.harness;
19+
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.mockito.Mockito.mock;
23+
24+
import java.util.Optional;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.function.Function;
28+
import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
29+
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
30+
import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime;
31+
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
32+
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
33+
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
34+
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
35+
import org.junit.Test;
36+
import org.junit.runner.RunWith;
37+
import org.junit.runners.JUnit4;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
@RunWith(JUnit4.class)
42+
public class SingleSourceWorkerHarnessTest {
43+
private static final Logger LOG = LoggerFactory.getLogger(SingleSourceWorkerHarnessTest.class);
44+
private final WorkCommitter workCommitter = mock(WorkCommitter.class);
45+
private final GetDataClient getDataClient = mock(GetDataClient.class);
46+
private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class);
47+
private final Runnable waitForResources = () -> {};
48+
private final Function<String, Optional<ComputationState>> computationStateFetcher =
49+
ignored -> Optional.empty();
50+
private final StreamingWorkScheduler streamingWorkScheduler = mock(StreamingWorkScheduler.class);
51+
52+
private SingleSourceWorkerHarness createWorkerHarness(
53+
SingleSourceWorkerHarness.GetWorkSender getWorkSender, JvmRuntime runtime) {
54+
// In non-test scenario this is set in DataflowWorkerHarnessHelper.initializeLogging(...).
55+
Thread.setDefaultUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(runtime, LOG));
56+
return SingleSourceWorkerHarness.builder()
57+
.setWorkCommitter(workCommitter)
58+
.setGetDataClient(getDataClient)
59+
.setHeartbeatSender(heartbeatSender)
60+
.setWaitForResources(waitForResources)
61+
.setStreamingWorkScheduler(streamingWorkScheduler)
62+
.setComputationStateFetcher(computationStateFetcher)
63+
.setGetWorkSender(getWorkSender)
64+
.build();
65+
}
66+
67+
@Test
68+
public void testDispatchLoop_unexpectedFailureKillsJvm_appliance() {
69+
SingleSourceWorkerHarness.GetWorkSender getWorkSender =
70+
SingleSourceWorkerHarness.GetWorkSender.forAppliance(
71+
() -> {
72+
throw new RuntimeException("something bad happened");
73+
});
74+
75+
FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
76+
createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
77+
assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
78+
fakeJvmRuntime.assertJvmTerminated();
79+
}
80+
81+
@Test
82+
public void testDispatchLoop_unexpectedFailureKillsJvm_streamingEngine() {
83+
SingleSourceWorkerHarness.GetWorkSender getWorkSender =
84+
SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine(
85+
workItemReceiver -> {
86+
throw new RuntimeException("something bad happened");
87+
});
88+
89+
FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
90+
createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
91+
assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
92+
fakeJvmRuntime.assertJvmTerminated();
93+
}
94+
95+
private static class FakeJvmRuntime implements JvmRuntime {
96+
private final CountDownLatch haltedLatch = new CountDownLatch(1);
97+
private volatile int exitStatus = 0;
98+
99+
@Override
100+
public void halt(int status) {
101+
exitStatus = status;
102+
haltedLatch.countDown();
103+
}
104+
105+
public boolean waitForRuntimeDeath(long timeout, TimeUnit unit) {
106+
try {
107+
return haltedLatch.await(timeout, unit);
108+
} catch (InterruptedException e) {
109+
return false;
110+
}
111+
}
112+
113+
private void assertJvmTerminated() {
114+
assertThat(exitStatus).isEqualTo(WorkerUncaughtExceptionHandler.JVM_TERMINATED_STATUS_CODE);
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)