Skip to content

Refactor: separate SplittableTruncateSizedRestrictions#35021

Merged
kennknowles merged 1 commit intoapache:masterfrom
kennknowles:FnApiDoFnRunner-TruncateRestriction
Jul 18, 2025
Merged

Refactor: separate SplittableTruncateSizedRestrictions#35021
kennknowles merged 1 commit intoapache:masterfrom
kennknowles:FnApiDoFnRunner-TruncateRestriction

Conversation

@kennknowles
Copy link
Member

This follows #34919, splis out calls to @TruncateRestriction. This is the first Fn API transform that deals with responding to splits so it is a bit more complex than the prior refactors that only touched @GetSize and @GetInitialRestriction and @SplitRestriction that do not respond to split requests.

As I understand it, this responds to split requests by delegating them downstream, as it is expected to be fused with final element+restriction processing.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions github-actions bot added the java label May 21, 2025
@kennknowles kennknowles force-pushed the FnApiDoFnRunner-TruncateRestriction branch 5 times, most recently from b5b739b to 5178d01 Compare June 16, 2025 22:21
@kennknowles kennknowles marked this pull request as ready for review June 16, 2025 22:22
@kennknowles
Copy link
Member Author

R: @scwhittle

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@kennknowles
Copy link
Member Author

I was spending my time on #34902 but honestly these refactors make that a lot more manageable. So I put an hour into this one today to fix the last remaining harness test failure. CC @stankiewicz @talatuyarer

@kennknowles kennknowles force-pushed the FnApiDoFnRunner-TruncateRestriction branch 2 times, most recently from 300dc36 to bc12475 Compare June 17, 2025 17:00
@kennknowles kennknowles force-pushed the FnApiDoFnRunner-TruncateRestriction branch from bc12475 to 89fbea4 Compare June 17, 2025 18:11
@kennknowles
Copy link
Member Author

Please do review. All the failing tests pass locally and I believe are flaking. I will keep kicking them and trying to repro locally but so far non are actually broken.

@kennknowles
Copy link
Member Author

Green! Huzzah!

// Note that the assumption here is the fullInputCoder of the Truncate transform should be the
// the same as the SDF/Process transform.
Coder<WindowedValue<?>> fullInputCoder =
(Coder<WindowedValue<?>>) (Object) WindowedValues.getFullCoder(inputCoder, windowCoder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be enforced earlier during constuction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I don't really understand this comment TBH.

I thought having the coders line up is an invariant on the ProcessBundleDescriptor. I had actually thought that it would be up to the descriptor to say what coder to use, and that in the Fn API the fact that the data place is using a WindowedValueCoder is explicit in the instruction graph. But I do see that we pull the coder from the proto and it is not a WindowedValueCoder, so I guess we have left that implicit? That actually seems like a problem we may have to fix...

For now I actually think just removing this comment is fair, no? This is just the same process that is used in all the variants of the FnApiDoFnRunner. I think the TODO would be "just use the coder the runner tells you to"? Like... we want to be able to tell the harness to use coders that may or may not be windowed value coders? As in, the harness would be a UDF server that just does as it is told with as few assumptions as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this cast could ever fail, and if it could should we be creating the fullInputCoder during the construction of this class so we fail earlier during setup, since I believe we have inputCoder and windowCoder available then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cast can't fail because it is actually just forgetting some actual type parameters for type compatibility with other code. It converts FullWindowedValueCoder<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> to Coder<WindowedValue<?>>.

  • Because the erased type is Coder in all cases it literally cannot fail.
  • It can be incorrect, but we don't have a way to catch it until it goes weirdly wrong later (as with all such "unchecked" casts)
  • The wacky cast "through" Object avoids raw types, which would require suppression. This used to be a cast through the raw type Coder.

But it can, indeed, be constructed earlier. I've moved it to the constructor.

@kennknowles kennknowles force-pushed the FnApiDoFnRunner-TruncateRestriction branch from 89fbea4 to f577796 Compare June 20, 2025 18:20
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor comments

* check values for null to determine presence/absence. Instead you can store a {@code @Nullable
* Holder<T>}.
*/
public class Holder<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark Internal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, all of sdk/util is "internal". Not sure whether that or an annotation will stop a user from complaining if we break it though :-). I added the annotation.

import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;

/** Miscellaneous methods for working with progress. */
public abstract class ProgressUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, users should never have a compile-time dep on org.apache.beam.fn.harness. The SDK harness is a runtime-only non-user-facing library. I added the annotation anyhow, though I worry that suggests other classes in it are not internal..

// Note that the assumption here is the fullInputCoder of the Truncate transform should be the
// the same as the SDF/Process transform.
Coder<WindowedValue<?>> fullInputCoder =
(Coder<WindowedValue<?>>) (Object) WindowedValues.getFullCoder(inputCoder, windowCoder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this cast could ever fail, and if it could should we be creating the fullInputCoder during the construction of this class so we fail earlier during setup, since I believe we have inputCoder and windowCoder available then.

@kennknowles kennknowles force-pushed the FnApiDoFnRunner-TruncateRestriction branch from f577796 to 5b4c05e Compare July 1, 2025 19:38
Copy link
Member Author

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough review! I will get the tests green, run our internal tests, then merge.

// Note that the assumption here is the fullInputCoder of the Truncate transform should be the
// the same as the SDF/Process transform.
Coder<WindowedValue<?>> fullInputCoder =
(Coder<WindowedValue<?>>) (Object) WindowedValues.getFullCoder(inputCoder, windowCoder);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cast can't fail because it is actually just forgetting some actual type parameters for type compatibility with other code. It converts FullWindowedValueCoder<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> to Coder<WindowedValue<?>>.

  • Because the erased type is Coder in all cases it literally cannot fail.
  • It can be incorrect, but we don't have a way to catch it until it goes weirdly wrong later (as with all such "unchecked" casts)
  • The wacky cast "through" Object avoids raw types, which would require suppression. This used to be a cast through the raw type Coder.

But it can, indeed, be constructed earlier. I've moved it to the constructor.

* check values for null to determine presence/absence. Instead you can store a {@code @Nullable
* Holder<T>}.
*/
public class Holder<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, all of sdk/util is "internal". Not sure whether that or an annotation will stop a user from complaining if we break it though :-). I added the annotation.

import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;

/** Miscellaneous methods for working with progress. */
public abstract class ProgressUtils {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, users should never have a compile-time dep on org.apache.beam.fn.harness. The SDK harness is a runtime-only non-user-facing library. I added the annotation anyhow, though I worry that suggests other classes in it are not internal..

@kennknowles kennknowles merged commit 8eba198 into apache:master Jul 18, 2025
25 of 27 checks passed
@kennknowles kennknowles deleted the FnApiDoFnRunner-TruncateRestriction branch July 18, 2025 21:51
@Abacn
Copy link
Contributor

Abacn commented Jul 24, 2025

This PR breaks PubsubIO Load Test, specifically a use case of UnboundedSyntheticSource stopped producing data, see #35194 (comment) for details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants