Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
"https://github.com/apache/beam/pull/36631": "dofn lifecycle",
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
"https://github.com/apache/beam/pull/36631": "dofn lifecycle validation",
}
30 changes: 24 additions & 6 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
Expand Down Expand Up @@ -452,7 +451,17 @@ task validatesRunner {
excludedTests: [
// TODO(https://github.com/apache/beam/issues/21472)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
]

// These tests use static state and don't work with remote execution.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
]
))
}

Expand All @@ -474,7 +483,17 @@ task validatesRunnerStreaming {
// GroupIntoBatches.withShardedKey not supported on streaming runner v1
// https://github.com/apache/beam/issues/22592
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
]

// These tests use static state and don't work with remote execution.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
]
))
}

Expand Down Expand Up @@ -543,8 +562,7 @@ task validatesRunnerV2 {
excludedTests: [
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',

// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
// These tests use static state and don't work with remote execution.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
Expand Down Expand Up @@ -586,7 +604,7 @@ task validatesRunnerV2Streaming {
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime',

// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
// These tests use static state and don't work with remote execution.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
Networks.replaceDirectedNetworkNodes(
network, createOutputReceiversTransform(stageName, counterSet));

// Swap out all the ParallelInstruction nodes with Operation nodes
Networks.replaceDirectedNetworkNodes(
network,
createOperationTransformForParallelInstructionNodes(
stageName, network, options, readerFactory, sinkFactory, executionContext));
// Swap out all the ParallelInstruction nodes with Operation nodes. While updating the network,
// we keep track of
// the created Operations so that if an exception is encountered we can properly abort started
// operations.
ArrayList<Operation> createdOperations = new ArrayList<>();
try {
Networks.replaceDirectedNetworkNodes(
network,
createOperationTransformForParallelInstructionNodes(
stageName,
network,
options,
readerFactory,
sinkFactory,
executionContext,
createdOperations));
} catch (RuntimeException exn) {
for (Operation o : createdOperations) {
try {
o.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are now evaluating teardown for all pardos when one throw. Given the change would it be straightforward to fix for the case DoFn finished normally (like changing catch to finally ), or it would need further consideration?

Copy link
Contributor Author

@scwhittle scwhittle Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when creating the executor, if there is no error creating it then we don't want to abort here because the executor is then returned and reused many times. If we want to ensure we teardown DoFns, I think that we need to have some timeout on the internal cache of DoFns in DoFnInstanceManagers.java

} catch (Exception exn2) {
exn.addSuppressed(exn2);
}
}
throw exn;
}

// Collect all the operations within the network and attach all the operations as receivers
// to preceding output receivers.
Expand Down Expand Up @@ -144,7 +165,8 @@ Function<Node, Node> createOperationTransformForParallelInstructionNodes(
final PipelineOptions options,
final ReaderFactory readerFactory,
final SinkFactory sinkFactory,
final DataflowExecutionContext<?> executionContext) {
final DataflowExecutionContext<?> executionContext,
final List<Operation> createdOperations) {

return new TypeSafeNodeFunction<ParallelInstructionNode>(ParallelInstructionNode.class) {
@Override
Expand All @@ -156,27 +178,31 @@ public Node typedApply(ParallelInstructionNode node) {
instruction.getOriginalName(),
instruction.getSystemName(),
instruction.getName());
OperationNode result;
try {
DataflowOperationContext context = executionContext.createOperationContext(nameContext);
if (instruction.getRead() != null) {
return createReadOperation(
network, node, options, readerFactory, executionContext, context);
result =
createReadOperation(
network, node, options, readerFactory, executionContext, context);
} else if (instruction.getWrite() != null) {
return createWriteOperation(node, options, sinkFactory, executionContext, context);
result = createWriteOperation(node, options, sinkFactory, executionContext, context);
} else if (instruction.getParDo() != null) {
return createParDoOperation(network, node, options, executionContext, context);
result = createParDoOperation(network, node, options, executionContext, context);
} else if (instruction.getPartialGroupByKey() != null) {
return createPartialGroupByKeyOperation(
network, node, options, executionContext, context);
result =
createPartialGroupByKeyOperation(network, node, options, executionContext, context);
} else if (instruction.getFlatten() != null) {
return createFlattenOperation(network, node, context);
result = createFlattenOperation(network, node, context);
} else {
throw new IllegalArgumentException(
String.format("Unexpected instruction: %s", instruction));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdOperations.add(result.getOperation());
return result;
}
};
}
Expand Down Expand Up @@ -328,7 +354,6 @@ public Node typedApply(InstructionOutputNode input) {
Coder<?> coder =
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudOutput.getCodec()));

@SuppressWarnings("unchecked")
ElementCounter outputCounter =
new DataflowOutputCounter(
cloudOutput.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand All @@ -36,7 +36,9 @@ public class MapTaskExecutor implements WorkExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MapTaskExecutor.class);

/** The operations in the map task, in execution order. */
public final List<Operation> operations;
public final ArrayList<Operation> operations;

private boolean closed = false;

private final ExecutionStateTracker executionStateTracker;

Expand All @@ -54,7 +56,7 @@ public MapTaskExecutor(
CounterSet counters,
ExecutionStateTracker executionStateTracker) {
this.counters = counters;
this.operations = operations;
this.operations = new ArrayList<>(operations);
this.executionStateTracker = executionStateTracker;
}

Expand All @@ -63,6 +65,7 @@ public CounterSet getOutputCounters() {
return counters;
}

/** May be reused if execute() returns without an exception being thrown. */
@Override
public void execute() throws Exception {
LOG.debug("Executing map task");
Expand All @@ -74,13 +77,11 @@ public void execute() throws Exception {
// Starting a root operation such as a ReadOperation does the work
// of processing the input dataset.
LOG.debug("Starting operations");
ListIterator<Operation> iterator = operations.listIterator(operations.size());
while (iterator.hasPrevious()) {
for (int i = operations.size() - 1; i >= 0; --i) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Worker aborted");
}
Operation op = iterator.previous();
op.start();
operations.get(i).start();
}

// Finish operations, in forward-execution-order, so that a
Expand All @@ -94,16 +95,13 @@ public void execute() throws Exception {
op.finish();
}
} catch (Exception | Error exn) {
LOG.debug("Aborting operations", exn);
for (Operation op : operations) {
try {
op.abort();
} catch (Exception | Error exn2) {
exn.addSuppressed(exn2);
if (exn2 instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
try {
closeInternal();
} catch (Exception closeExn) {
exn.addSuppressed(closeExn);
}
if (exn instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw exn;
}
Expand Down Expand Up @@ -164,6 +162,45 @@ public void abort() {
}
}

private void closeInternal() throws Exception {
if (closed) {
return;
}
LOG.debug("Aborting operations");
@Nullable Exception exn = null;
for (Operation op : operations) {
try {
op.abort();
} catch (Exception | Error exn2) {
if (exn2 instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (exn == null) {
if (exn2 instanceof Exception) {
exn = (Exception) exn2;
} else {
exn = new RuntimeException(exn2);
}
} else {
exn.addSuppressed(exn2);
}
}
}
closed = true;
if (exn != null) {
throw exn;
}
}

@Override
public void close() {
try {
closeInternal();
} catch (Exception e) {
LOG.error("Exception while closing MapTaskExecutor, ignoring", e);
}
}

@Override
public List<Integer> reportProducedEmptyOutput() {
List<Integer> emptyOutputSinkIndexes = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,18 +415,21 @@ private ExecuteWorkResult executeWork(

// Release the execution state for another thread to use.
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
computationWorkExecutor = null;

work.setState(Work.State.COMMIT_QUEUED);
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));

return ExecuteWorkResult.create(
outputBuilder, stateReader.getBytesRead() + localSideInputStateFetcher.getBytesRead());
} catch (Throwable t) {
// If processing failed due to a thrown exception, close the executionState. Do not
// return/release the executionState back to computationState as that will lead to this
// executionState instance being reused.
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
computationWorkExecutor.invalidate();
if (computationWorkExecutor != null) {
// If processing failed due to a thrown exception, close the executionState. Do not
// return/release the executionState back to computationState as that will lead to this
// executionState instance being reused.
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
computationWorkExecutor.invalidate();
}

// Re-throw the exception, it will be caught and handled by workFailureProcessor downstream.
throw t;
Expand Down
Loading
Loading