Skip to content

Conversation

@scwhittle
Copy link
Contributor

@scwhittle scwhittle commented Oct 27, 2025

This PR began with adding unit testing to expose the reported problems with #18592.

The added tests did show that we are not calling Operation.abort in all cases:

  • in IntrinsicMapTaskExecutorFactory we want to abort all operations that were created and then abandoned if another operation had an issue
  • MapTaskExecutor did not implement WorkExecutor.close, so we were neglecting to call abort on operations when executors were closed due to cache eviction etc.

After addressing these fixes we call abort on Operations (including ParDoOperation) when they are being abandoned. Additional tests were then added at the DoFn level to verify that teardown was called and surprisingly showed that we were still not calling teardown. Further investigation shows that this is due ParDoOperation containing a SimpleParDoFn which lazily claims a DoFn from DoFnInstanceManager which has an internal cache only when a bundle begins processing, releasing the DoFn back to the cache in finishBundle. Since the cases where we were missing calls to ParDoOperation.abort (and addressed by changes in the PR) were not between startBundle/finishBundle, ParDoOperation.abort is a no-op.

So these tests show that the expectations we had about when teardown were called were incorrect and that we are not leaking DoFns but are instead caching them and would reuse them on retry. The added StreamingDataflowWorker test verifies this behavior.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@scwhittle scwhittle force-pushed the teardown_on_invalidation branch 2 times, most recently from 1633d31 to 2ae939a Compare October 28, 2025 14:01
@scwhittle scwhittle force-pushed the teardown_on_invalidation branch 2 times, most recently from e1468a3 to 4bf5b7f Compare November 6, 2025 13:59
@scwhittle
Copy link
Contributor Author

Error looks unrelated, sent out #36754

@scwhittle scwhittle marked this pull request as ready for review November 7, 2025 14:27
@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@github-actions
Copy link
Contributor

github-actions bot commented Nov 7, 2025

Assigning reviewers:

R: @Abacn added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, had a few comments. Would you mind updating the PR description noting which scenario this PR is supposed to fix?

Reading #18592 (comment) it seems this fixed teardown on setup exception on Dataflow streaming runner. Would it also apply to batch? However I tested this PR with my old test case for #31381, currently the behavior appears remain the same as HEAD. In particular, I have 3 fused DoFn's, and throw in DoFn2's setup.

For batch, only DoFn2's teardown is invoked. However I also notice previously setup DoFn not re-setup as well. e.g.

@Setup3 null, 743 at 277fbd5a
@Setup2 null, 510 at 67875efd
@Teardown2 510 at 67875efd
Failure processing work item google.com:clouddfe;2025-11-12_09_44_38-12709838531006866747;5913875962390955537: Uncaught exception occurred during work unit execution. This will be retried.
Finished processing stage s01 with 1 errors in 0.87 seconds
Starting MapTask stage s01
@Setup2 null, 399 at 145c86d
@Setup null, 309 at 23ddbaee
@StartBundle3 743 at 277fbd5a
...

(job id 2025-11-12_09_44_38-12709838531006866747)

for streaming the runner creates 20 DoFn instances at one time, one crashed at setup and called its teardown, but did not re-setup a DoFn2 (I still get 20 counts for DoFn2 setup) (job id 2025-11-12_11_32_33-12848924584850197035)

} catch (RuntimeException exn) {
for (Operation o : createdOperations) {
try {
o.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are now evaluating teardown for all pardos when one throw. Given the change would it be straightforward to fix for the case DoFn finished normally (like changing catch to finally ), or it would need further consideration?

Copy link
Contributor Author

@scwhittle scwhittle Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when creating the executor, if there is no error creating it then we don't want to abort here because the executor is then returned and reused many times. If we want to ensure we teardown DoFns, I think that we need to have some timeout on the internal cache of DoFns in DoFnInstanceManagers.java

…hat previously created dofns are torn down.

Ensure that when we invalidate a WorkExecutor that we close the MapTaskExecutor which ends up calling teardown on the Dofns. This actually doesn't matter because the Operators already teardown if there is an exception during processing and SimpleParDoFn checks out and returns the actual dofn only during bundle processing. Added test coverage of this.
@scwhittle scwhittle changed the title Fix problems leading to Teardown not being called Ensure that Operations are aborted when MapTaskExecutor is closed Nov 13, 2025
@scwhittle scwhittle changed the title Ensure that Operations are aborted when MapTaskExecutor is closed Ensure that Operations are aborted when MapTaskExecutor is closed. Add tests around setup/teardown of DoFns Nov 13, 2025
@scwhittle
Copy link
Contributor Author

Thanks, had a few comments. Would you mind updating the PR description noting which scenario this PR is supposed to fix?

Sorry I clarified what this PR does in the description and how our expectations of when setup/teardown are called were incorrect. So this fixes some missing abort calls, but those calls ended up being no-ops for ParDoOperation (maybe needed for other operation types though). This PR adds testing validating reuse of dofns.

Reading #18592 (comment) it seems this fixed teardown on setup exception on Dataflow streaming runner. Would it also apply to batch? However I tested this PR with my old test case for #31381, currently the behavior appears remain the same as HEAD. In particular, I have 3 fused DoFn's, and throw in DoFn2's setup.

For batch, only DoFn2's teardown is invoked. However I also notice previously setup DoFn not re-setup as well. e.g.

@Setup3 null, 743 at 277fbd5a
@Setup2 null, 510 at 67875efd
@Teardown2 510 at 67875efd
Failure processing work item google.com:clouddfe;2025-11-12_09_44_38-12709838531006866747;5913875962390955537: Uncaught exception occurred during work unit execution. This will be retried.
Finished processing stage s01 with 1 errors in 0.87 seconds
Starting MapTask stage s01
@Setup2 null, 399 at 145c86d
@Setup null, 309 at 23ddbaee
@StartBundle3 743 at 277fbd5a
...

(job id 2025-11-12_09_44_38-12709838531006866747)

for streaming the runner creates 20 DoFn instances at one time, one crashed at setup and called its teardown, but did not re-setup a DoFn2 (I still get 20 counts for DoFn2 setup) (job id 2025-11-12_11_32_33-12848924584850197035)

Hopefully this is clarified by the updated PR description. This is actually the expected and good behavior of the code. We don't need to throw away DoFn2 in the case DoFn had an error setup. The dofns are independent at this point and we haven't interacted in such a way with DoFn2 instance that we can't reuse it. We are still respecting the lifecycle of DoFn2 and instead of teardown/resetup, we just cache and reuse the Dofn2 instance when we are recreating a new MapTaskExecutor.

} catch (RuntimeException exn) {
for (Operation o : createdOperations) {
try {
o.abort();
Copy link
Contributor Author

@scwhittle scwhittle Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when creating the executor, if there is no error creating it then we don't want to abort here because the executor is then returned and reused many times. If we want to ensure we teardown DoFns, I think that we need to have some timeout on the internal cache of DoFns in DoFnInstanceManagers.java

@scwhittle scwhittle force-pushed the teardown_on_invalidation branch from 4bf5b7f to b7448da Compare November 13, 2025 09:49
@scwhittle scwhittle merged commit 5732086 into apache:master Nov 17, 2025
20 of 22 checks passed
@scwhittle scwhittle deleted the teardown_on_invalidation branch November 17, 2025 08:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants