Skip to content

Commit 5732086

Browse files
authored
Ensure that Operations are aborted when MapTaskExecutor is closed. Add tests around setup/teardown of DoFns (#36631)
1 parent c8d7ca0 commit 5732086

File tree

11 files changed

+415
-65
lines changed

11 files changed

+415
-65
lines changed

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@
44
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
55
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
66
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
7-
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
7+
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
8+
"https://github.com/apache/beam/pull/36631": "dofn lifecycle",
89
}

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_V2.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@
44
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
55
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
66
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
7-
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
7+
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
8+
"https://github.com/apache/beam/pull/36631": "dofn lifecycle validation",
89
}

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
205205
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
206206
'org.apache.beam.sdk.testing.UsesMultimapState',
207207
'org.apache.beam.sdk.testing.UsesTestStream',
208-
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
209208
'org.apache.beam.sdk.testing.UsesMetricsPusher',
210209
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
211210
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
@@ -452,7 +451,17 @@ task validatesRunner {
452451
excludedTests: [
453452
// TODO(https://github.com/apache/beam/issues/21472)
454453
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
455-
]
454+
455+
// These tests use static state and don't work with remote execution.
456+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
457+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
458+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
459+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
460+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
461+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
462+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
463+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
464+
]
456465
))
457466
}
458467

@@ -474,7 +483,17 @@ task validatesRunnerStreaming {
474483
// GroupIntoBatches.withShardedKey not supported on streaming runner v1
475484
// https://github.com/apache/beam/issues/22592
476485
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
477-
]
486+
487+
// These tests use static state and don't work with remote execution.
488+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
489+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
490+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
491+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
492+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
493+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
494+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
495+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
496+
]
478497
))
479498
}
480499

@@ -543,8 +562,7 @@ task validatesRunnerV2 {
543562
excludedTests: [
544563
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
545564

546-
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
547-
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
565+
// These tests use static state and don't work with remote execution.
548566
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
549567
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
550568
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
@@ -586,7 +604,7 @@ task validatesRunnerV2Streaming {
586604
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
587605
'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime',
588606

589-
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
607+
// These tests use static state and don't work with remote execution.
590608
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
591609
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
592610
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
105105
Networks.replaceDirectedNetworkNodes(
106106
network, createOutputReceiversTransform(stageName, counterSet));
107107

108-
// Swap out all the ParallelInstruction nodes with Operation nodes
109-
Networks.replaceDirectedNetworkNodes(
110-
network,
111-
createOperationTransformForParallelInstructionNodes(
112-
stageName, network, options, readerFactory, sinkFactory, executionContext));
108+
// Swap out all the ParallelInstruction nodes with Operation nodes. While updating the network,
109+
// we keep track of
110+
// the created Operations so that if an exception is encountered we can properly abort started
111+
// operations.
112+
ArrayList<Operation> createdOperations = new ArrayList<>();
113+
try {
114+
Networks.replaceDirectedNetworkNodes(
115+
network,
116+
createOperationTransformForParallelInstructionNodes(
117+
stageName,
118+
network,
119+
options,
120+
readerFactory,
121+
sinkFactory,
122+
executionContext,
123+
createdOperations));
124+
} catch (RuntimeException exn) {
125+
for (Operation o : createdOperations) {
126+
try {
127+
o.abort();
128+
} catch (Exception exn2) {
129+
exn.addSuppressed(exn2);
130+
}
131+
}
132+
throw exn;
133+
}
113134

114135
// Collect all the operations within the network and attach all the operations as receivers
115136
// to preceding output receivers.
@@ -144,7 +165,8 @@ Function<Node, Node> createOperationTransformForParallelInstructionNodes(
144165
final PipelineOptions options,
145166
final ReaderFactory readerFactory,
146167
final SinkFactory sinkFactory,
147-
final DataflowExecutionContext<?> executionContext) {
168+
final DataflowExecutionContext<?> executionContext,
169+
final List<Operation> createdOperations) {
148170

149171
return new TypeSafeNodeFunction<ParallelInstructionNode>(ParallelInstructionNode.class) {
150172
@Override
@@ -156,27 +178,31 @@ public Node typedApply(ParallelInstructionNode node) {
156178
instruction.getOriginalName(),
157179
instruction.getSystemName(),
158180
instruction.getName());
181+
OperationNode result;
159182
try {
160183
DataflowOperationContext context = executionContext.createOperationContext(nameContext);
161184
if (instruction.getRead() != null) {
162-
return createReadOperation(
163-
network, node, options, readerFactory, executionContext, context);
185+
result =
186+
createReadOperation(
187+
network, node, options, readerFactory, executionContext, context);
164188
} else if (instruction.getWrite() != null) {
165-
return createWriteOperation(node, options, sinkFactory, executionContext, context);
189+
result = createWriteOperation(node, options, sinkFactory, executionContext, context);
166190
} else if (instruction.getParDo() != null) {
167-
return createParDoOperation(network, node, options, executionContext, context);
191+
result = createParDoOperation(network, node, options, executionContext, context);
168192
} else if (instruction.getPartialGroupByKey() != null) {
169-
return createPartialGroupByKeyOperation(
170-
network, node, options, executionContext, context);
193+
result =
194+
createPartialGroupByKeyOperation(network, node, options, executionContext, context);
171195
} else if (instruction.getFlatten() != null) {
172-
return createFlattenOperation(network, node, context);
196+
result = createFlattenOperation(network, node, context);
173197
} else {
174198
throw new IllegalArgumentException(
175199
String.format("Unexpected instruction: %s", instruction));
176200
}
177201
} catch (Exception e) {
178202
throw new RuntimeException(e);
179203
}
204+
createdOperations.add(result.getOperation());
205+
return result;
180206
}
181207
};
182208
}
@@ -328,7 +354,6 @@ public Node typedApply(InstructionOutputNode input) {
328354
Coder<?> coder =
329355
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudOutput.getCodec()));
330356

331-
@SuppressWarnings("unchecked")
332357
ElementCounter outputCounter =
333358
new DataflowOutputCounter(
334359
cloudOutput.getName(),

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.beam.runners.dataflow.worker.util.common.worker;
1919

2020
import java.io.Closeable;
21+
import java.util.ArrayList;
2122
import java.util.List;
22-
import java.util.ListIterator;
2323
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
2424
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
2525
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -36,7 +36,9 @@ public class MapTaskExecutor implements WorkExecutor {
3636
private static final Logger LOG = LoggerFactory.getLogger(MapTaskExecutor.class);
3737

3838
/** The operations in the map task, in execution order. */
39-
public final List<Operation> operations;
39+
public final ArrayList<Operation> operations;
40+
41+
private boolean closed = false;
4042

4143
private final ExecutionStateTracker executionStateTracker;
4244

@@ -54,7 +56,7 @@ public MapTaskExecutor(
5456
CounterSet counters,
5557
ExecutionStateTracker executionStateTracker) {
5658
this.counters = counters;
57-
this.operations = operations;
59+
this.operations = new ArrayList<>(operations);
5860
this.executionStateTracker = executionStateTracker;
5961
}
6062

@@ -63,6 +65,7 @@ public CounterSet getOutputCounters() {
6365
return counters;
6466
}
6567

68+
/** May be reused if execute() returns without an exception being thrown. */
6669
@Override
6770
public void execute() throws Exception {
6871
LOG.debug("Executing map task");
@@ -74,13 +77,11 @@ public void execute() throws Exception {
7477
// Starting a root operation such as a ReadOperation does the work
7578
// of processing the input dataset.
7679
LOG.debug("Starting operations");
77-
ListIterator<Operation> iterator = operations.listIterator(operations.size());
78-
while (iterator.hasPrevious()) {
80+
for (int i = operations.size() - 1; i >= 0; --i) {
7981
if (Thread.currentThread().isInterrupted()) {
8082
throw new InterruptedException("Worker aborted");
8183
}
82-
Operation op = iterator.previous();
83-
op.start();
84+
operations.get(i).start();
8485
}
8586

8687
// Finish operations, in forward-execution-order, so that a
@@ -94,16 +95,13 @@ public void execute() throws Exception {
9495
op.finish();
9596
}
9697
} catch (Exception | Error exn) {
97-
LOG.debug("Aborting operations", exn);
98-
for (Operation op : operations) {
99-
try {
100-
op.abort();
101-
} catch (Exception | Error exn2) {
102-
exn.addSuppressed(exn2);
103-
if (exn2 instanceof InterruptedException) {
104-
Thread.currentThread().interrupt();
105-
}
106-
}
98+
try {
99+
closeInternal();
100+
} catch (Exception closeExn) {
101+
exn.addSuppressed(closeExn);
102+
}
103+
if (exn instanceof InterruptedException) {
104+
Thread.currentThread().interrupt();
107105
}
108106
throw exn;
109107
}
@@ -164,6 +162,45 @@ public void abort() {
164162
}
165163
}
166164

165+
private void closeInternal() throws Exception {
166+
if (closed) {
167+
return;
168+
}
169+
LOG.debug("Aborting operations");
170+
@Nullable Exception exn = null;
171+
for (Operation op : operations) {
172+
try {
173+
op.abort();
174+
} catch (Exception | Error exn2) {
175+
if (exn2 instanceof InterruptedException) {
176+
Thread.currentThread().interrupt();
177+
}
178+
if (exn == null) {
179+
if (exn2 instanceof Exception) {
180+
exn = (Exception) exn2;
181+
} else {
182+
exn = new RuntimeException(exn2);
183+
}
184+
} else {
185+
exn.addSuppressed(exn2);
186+
}
187+
}
188+
}
189+
closed = true;
190+
if (exn != null) {
191+
throw exn;
192+
}
193+
}
194+
195+
@Override
196+
public void close() {
197+
try {
198+
closeInternal();
199+
} catch (Exception e) {
200+
LOG.error("Exception while closing MapTaskExecutor, ignoring", e);
201+
}
202+
}
203+
167204
@Override
168205
public List<Integer> reportProducedEmptyOutput() {
169206
List<Integer> emptyOutputSinkIndexes = Lists.newArrayList();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(
415415

416416
// Release the execution state for another thread to use.
417417
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
418+
computationWorkExecutor = null;
418419

419420
work.setState(Work.State.COMMIT_QUEUED);
420421
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
421422

422423
return ExecuteWorkResult.create(
423424
outputBuilder, stateReader.getBytesRead() + localSideInputStateFetcher.getBytesRead());
424425
} catch (Throwable t) {
425-
// If processing failed due to a thrown exception, close the executionState. Do not
426-
// return/release the executionState back to computationState as that will lead to this
427-
// executionState instance being reused.
428-
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
429-
computationWorkExecutor.invalidate();
426+
if (computationWorkExecutor != null) {
427+
// If processing failed due to a thrown exception, close the executionState. Do not
428+
// return/release the executionState back to computationState as that will lead to this
429+
// executionState instance being reused.
430+
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
431+
computationWorkExecutor.invalidate();
432+
}
430433

431434
// Re-throw the exception, it will be caught and handled by workFailureProcessor downstream.
432435
throw t;

0 commit comments

Comments
 (0)