Skip to content

Commit e1468a3

Browse files
committed
Ensure that when an exception is encountered during Setup of DoFns, that previously created dofns are torn down.
Ensure that when we invalidate a WorkExecutor that we close the MapTaskExecutor which ends up calling teardown on the Dofns. This actually doesn't matter because the Operators already teardown if there is an exception during processing and SimpleParDoFn checks out and returns the actual dofn only during bundle processing. Added test coverage of this.
1 parent 09f4963 commit e1468a3

File tree

9 files changed

+391
-61
lines changed

9 files changed

+391
-61
lines changed

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
205205
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
206206
'org.apache.beam.sdk.testing.UsesMultimapState',
207207
'org.apache.beam.sdk.testing.UsesTestStream',
208-
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
209208
'org.apache.beam.sdk.testing.UsesMetricsPusher',
210209
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
211210
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
@@ -520,8 +519,7 @@ task validatesRunnerV2 {
520519
excludedTests: [
521520
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
522521

523-
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
524-
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
522+
// These tests use static state and don't work with remote execution.
525523
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
526524
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
527525
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
@@ -563,7 +561,7 @@ task validatesRunnerV2Streaming {
563561
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
564562
'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime',
565563

566-
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
564+
// These tests use static state and don't work with remote execution.
567565
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
568566
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
569567
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
105105
Networks.replaceDirectedNetworkNodes(
106106
network, createOutputReceiversTransform(stageName, counterSet));
107107

108-
// Swap out all the ParallelInstruction nodes with Operation nodes
109-
Networks.replaceDirectedNetworkNodes(
110-
network,
111-
createOperationTransformForParallelInstructionNodes(
112-
stageName, network, options, readerFactory, sinkFactory, executionContext));
108+
// Swap out all the ParallelInstruction nodes with Operation nodes. While updating the network,
109+
// we keep track of
110+
// the created Operations so that if an exception is encountered we can properly abort started
111+
// operations.
112+
ArrayList<Operation> createdOperations = new ArrayList<>();
113+
try {
114+
Networks.replaceDirectedNetworkNodes(
115+
network,
116+
createOperationTransformForParallelInstructionNodes(
117+
stageName,
118+
network,
119+
options,
120+
readerFactory,
121+
sinkFactory,
122+
executionContext,
123+
createdOperations));
124+
} catch (RuntimeException exn) {
125+
for (Operation o : createdOperations) {
126+
try {
127+
o.abort();
128+
} catch (Exception exn2) {
129+
exn.addSuppressed(exn2);
130+
}
131+
}
132+
throw exn;
133+
}
113134

114135
// Collect all the operations within the network and attach all the operations as receivers
115136
// to preceding output receivers.
@@ -144,7 +165,8 @@ Function<Node, Node> createOperationTransformForParallelInstructionNodes(
144165
final PipelineOptions options,
145166
final ReaderFactory readerFactory,
146167
final SinkFactory sinkFactory,
147-
final DataflowExecutionContext<?> executionContext) {
168+
final DataflowExecutionContext<?> executionContext,
169+
final List<Operation> createdOperations) {
148170

149171
return new TypeSafeNodeFunction<ParallelInstructionNode>(ParallelInstructionNode.class) {
150172
@Override
@@ -156,27 +178,31 @@ public Node typedApply(ParallelInstructionNode node) {
156178
instruction.getOriginalName(),
157179
instruction.getSystemName(),
158180
instruction.getName());
181+
OperationNode result;
159182
try {
160183
DataflowOperationContext context = executionContext.createOperationContext(nameContext);
161184
if (instruction.getRead() != null) {
162-
return createReadOperation(
163-
network, node, options, readerFactory, executionContext, context);
185+
result =
186+
createReadOperation(
187+
network, node, options, readerFactory, executionContext, context);
164188
} else if (instruction.getWrite() != null) {
165-
return createWriteOperation(node, options, sinkFactory, executionContext, context);
189+
result = createWriteOperation(node, options, sinkFactory, executionContext, context);
166190
} else if (instruction.getParDo() != null) {
167-
return createParDoOperation(network, node, options, executionContext, context);
191+
result = createParDoOperation(network, node, options, executionContext, context);
168192
} else if (instruction.getPartialGroupByKey() != null) {
169-
return createPartialGroupByKeyOperation(
170-
network, node, options, executionContext, context);
193+
result =
194+
createPartialGroupByKeyOperation(network, node, options, executionContext, context);
171195
} else if (instruction.getFlatten() != null) {
172-
return createFlattenOperation(network, node, context);
196+
result = createFlattenOperation(network, node, context);
173197
} else {
174198
throw new IllegalArgumentException(
175199
String.format("Unexpected instruction: %s", instruction));
176200
}
177201
} catch (Exception e) {
178202
throw new RuntimeException(e);
179203
}
204+
createdOperations.add(result.getOperation());
205+
return result;
180206
}
181207
};
182208
}
@@ -328,7 +354,6 @@ public Node typedApply(InstructionOutputNode input) {
328354
Coder<?> coder =
329355
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudOutput.getCodec()));
330356

331-
@SuppressWarnings("unchecked")
332357
ElementCounter outputCounter =
333358
new DataflowOutputCounter(
334359
cloudOutput.getName(),

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.beam.runners.dataflow.worker.util.common.worker;
1919

2020
import java.io.Closeable;
21+
import java.util.ArrayList;
2122
import java.util.List;
22-
import java.util.ListIterator;
2323
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
2424
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
2525
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -36,7 +36,9 @@ public class MapTaskExecutor implements WorkExecutor {
3636
private static final Logger LOG = LoggerFactory.getLogger(MapTaskExecutor.class);
3737

3838
/** The operations in the map task, in execution order. */
39-
public final List<Operation> operations;
39+
public final ArrayList<Operation> operations;
40+
41+
private boolean closed = false;
4042

4143
private final ExecutionStateTracker executionStateTracker;
4244

@@ -54,7 +56,7 @@ public MapTaskExecutor(
5456
CounterSet counters,
5557
ExecutionStateTracker executionStateTracker) {
5658
this.counters = counters;
57-
this.operations = operations;
59+
this.operations = new ArrayList<>(operations);
5860
this.executionStateTracker = executionStateTracker;
5961
}
6062

@@ -63,6 +65,7 @@ public CounterSet getOutputCounters() {
6365
return counters;
6466
}
6567

68+
/** May be reused if execute() returns without an exception being thrown. */
6669
@Override
6770
public void execute() throws Exception {
6871
LOG.debug("Executing map task");
@@ -74,13 +77,11 @@ public void execute() throws Exception {
7477
// Starting a root operation such as a ReadOperation does the work
7578
// of processing the input dataset.
7679
LOG.debug("Starting operations");
77-
ListIterator<Operation> iterator = operations.listIterator(operations.size());
78-
while (iterator.hasPrevious()) {
80+
for (int i = operations.size() - 1; i >= 0; --i) {
7981
if (Thread.currentThread().isInterrupted()) {
8082
throw new InterruptedException("Worker aborted");
8183
}
82-
Operation op = iterator.previous();
83-
op.start();
84+
operations.get(i).start();
8485
}
8586

8687
// Finish operations, in forward-execution-order, so that a
@@ -94,16 +95,13 @@ public void execute() throws Exception {
9495
op.finish();
9596
}
9697
} catch (Exception | Error exn) {
97-
LOG.debug("Aborting operations", exn);
98-
for (Operation op : operations) {
99-
try {
100-
op.abort();
101-
} catch (Exception | Error exn2) {
102-
exn.addSuppressed(exn2);
103-
if (exn2 instanceof InterruptedException) {
104-
Thread.currentThread().interrupt();
105-
}
106-
}
98+
try {
99+
closeInternal();
100+
} catch (Exception closeExn) {
101+
exn.addSuppressed(closeExn);
102+
}
103+
if (exn instanceof InterruptedException) {
104+
Thread.currentThread().interrupt();
107105
}
108106
throw exn;
109107
}
@@ -164,6 +162,45 @@ public void abort() {
164162
}
165163
}
166164

165+
private void closeInternal() throws Exception {
166+
if (closed) {
167+
return;
168+
}
169+
LOG.debug("Aborting operations");
170+
@Nullable Exception exn = null;
171+
for (Operation op : operations) {
172+
try {
173+
op.abort();
174+
} catch (Exception | Error exn2) {
175+
if (exn2 instanceof InterruptedException) {
176+
Thread.currentThread().interrupt();
177+
}
178+
if (exn == null) {
179+
if (exn2 instanceof Exception) {
180+
exn = (Exception) exn2;
181+
} else {
182+
exn = new RuntimeException(exn2);
183+
}
184+
} else {
185+
exn.addSuppressed(exn2);
186+
}
187+
}
188+
}
189+
closed = true;
190+
if (exn != null) {
191+
throw exn;
192+
}
193+
}
194+
195+
@Override
196+
public void close() {
197+
try {
198+
closeInternal();
199+
} catch (Exception e) {
200+
LOG.error("Exception while closing MapTaskExecutor, ignoring", e);
201+
}
202+
}
203+
167204
@Override
168205
public List<Integer> reportProducedEmptyOutput() {
169206
List<Integer> emptyOutputSinkIndexes = Lists.newArrayList();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(
415415

416416
// Release the execution state for another thread to use.
417417
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
418+
computationWorkExecutor = null;
418419

419420
work.setState(Work.State.COMMIT_QUEUED);
420421
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
421422

422423
return ExecuteWorkResult.create(
423424
outputBuilder, stateReader.getBytesRead() + localSideInputStateFetcher.getBytesRead());
424425
} catch (Throwable t) {
425-
// If processing failed due to a thrown exception, close the executionState. Do not
426-
// return/release the executionState back to computationState as that will lead to this
427-
// executionState instance being reused.
428-
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
429-
computationWorkExecutor.invalidate();
426+
if (computationWorkExecutor != null) {
427+
// If processing failed due to a thrown exception, close the executionState. Do not
428+
// return/release the executionState back to computationState as that will lead to this
429+
// executionState instance being reused.
430+
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
431+
computationWorkExecutor.invalidate();
432+
}
430433

431434
// Re-throw the exception, it will be caught and handled by workFailureProcessor downstream.
432435
throw t;

0 commit comments

Comments
 (0)