Streamline non-cached state backed iterable.#34746
Conversation
Avoids all the bookeeping logic of creating and caching blocks of elements, which in addition allows decoded elements to be ephemerally decoded and consumed one at a time rather than storing them in large lists even when actual caching is disabled.
|
FYI @priyansndesai |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
| return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder); | ||
| } | ||
|
|
||
| static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> { |
There was a problem hiding this comment.
If this isn't used for state anywhere (I don't think it is) we probably don't even need to use the Prefethable* interfaces at all, simplifying things further.
There was a problem hiding this comment.
It is used for BagUserState, MultimapState etc. https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L76
There was a problem hiding this comment.
I meant to refer to the StateBackedIterable class itself. StateFetchingIterators are definitely used elsewhere.
There was a problem hiding this comment.
Sorry still not sure I'm understanding. Are you suggesting just inlining this class in StateBackedIterable to avoid having it use the interfaces?
There was a problem hiding this comment.
I meant, e.g. we could probably change declaration at https://github.com/apache/beam/blob/release-2.60.0/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java#L77 to be just an Iterable, changing the constructor for StateBackedIterable[Coder] to not take a Cache at all, etc. But it's also not too much work to keep it as a Prefetchable iterable given the underlying iterable is.
| return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder); | ||
| } | ||
|
|
||
| static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> { |
There was a problem hiding this comment.
It is used for BagUserState, MultimapState etc. https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L76
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
Show resolved
Hide resolved
3484504 to
877f111
Compare
|
Run Java PreCommit |
|
BoundedQueueExecutor test is unrelated flake, the spark tests were with side inputs so not sure if related, rerunning tests. |
scwhittle
left a comment
There was a problem hiding this comment.
Looks good as long as test failure is unrelated
|
Do we want to capability protect this? |
This isn't really a capability--the interaction with the runner remains the same. I thought about guarding this with an experiment, but there isn't really a good way to plumb it down. |
Avoids all the bookeeping logic of creating and caching blocks of elements, which in addition to reducing total work and simplifying the codepaths allows decoded elements to be ephemerally decoded and consumed one at a time rather than storing them in large lists even when actual caching is disabled which should play much better with the garbage collector.
Note that this disables caching of the tail of GBK iterables, which will result in more side input reads and possible performance degradation for those GBKs whose value iterables are too large to fit over the Data API but small enough to be substantially cached and are re-iterated. IMHO this is a reasonable tradeoff as (1) large values like this are the antithesis of what one wants to place in the cache (which can result in evicting all other values over the course of iteration) and (2) this provides increased performance (and in many cases avoiding outright failure, e.g. due to GC memory thrashing) for the common usecase of no re-iteration (including writing lots of data to fixed shards/dynamic destinations), and the most common case of re-iteration (CoGBK) does its own caching anyway.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.