Skip to content

[Dataflow Streaming] Modify AbstractWindmillStream to separate logical stream from physical streams.#35327

Merged
scwhittle merged 7 commits intoapache:masterfrom
scwhittle:substream
Jul 2, 2025
Merged

[Dataflow Streaming] Modify AbstractWindmillStream to separate logical stream from physical streams.#35327
scwhittle merged 7 commits intoapache:masterfrom
scwhittle:substream

Conversation

@scwhittle
Copy link
Contributor

Currently this should be equivalent behavior but the separate physical stream handlers will allow for multiple distinct physical substreams when we want to cleanly terminate streams to avoid deadlines.

Add additional tests for the reconnection on stream errors (which already occurs). These could be committed separately if desired.

While at it change the deadline used for get data parsing timeout to be consistent with the deadline used for the response observers. Currently this won't change much but as we use a longer deadline for direct streams and update that in the future we will avoid these spurious errors.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@scwhittle
Copy link
Contributor Author

R: @arunpandianp @m-trieu

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

failureHandler.throwIfNonEmpty();
@Override
public void appendHtml(PrintWriter writer) {
writer.format("CommitWorkStream: %d pending", pending.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we count only entries matching e.getValue().getKey() == this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

try {
BackOffUtils.next(Sleeper.DEFAULT, backoff);
} catch (IOException | InterruptedException ie) {
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why interrupt on IOException?

I'm also not sure why BackOffUtils.next has a throws IOException, it doesn't look like any implementation throws.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, separating here but will send you another PR to clean that up.

private final Deque<QueuedBatch> batches;

private final Map<Long, AppendableInputStream> pending;
private final Supplier<Integer> batchesDebugSizeSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on replacing this with the following?

`
private final ConcurrentLinkedDeque batches;

@SuppressWarnings("GuardedBy")
private int getBatchesSize() {
batches.size();
}`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer doing it without requiring a suppression since it doesn't seem too complicated.

private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE;

private final ConcurrentMap<Long, PendingRequest> pending;
private final ConcurrentMap<Long, KV<CommitWorkPhysicalStreamHandler, PendingRequest>> pending =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we move all pending requests to the new steam on onNewStream, it is not clear to me if there'll be multiple CommitWorkPhysicalStreamHandlers be in use at the same time. Will it change in the following PRs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the goal of this PR is to set it up so that we can easily add the logic to half-close the physical stream before we hit the deadline exceed (without requiring to change all the uses of the logical streams to do it themselves).

To avoid causing disruptions to sending new requests we will start a new physical substream at that point. So in that case we don't want to resend the pending requests that are still active on the half-closing stream.

@scwhittle
Copy link
Contributor Author

Having trouble reproducing the failures locally, still investigating.

@m-trieu
Copy link
Contributor

m-trieu commented Jun 29, 2025

Sorry for the delay! Will take a pass this week

@scwhittle scwhittle marked this pull request as ready for review June 30, 2025 18:15
} catch (RuntimeException e) {
// Catch possible exceptions to ensure that an exception for one commit does not prevent
// Catch possible exceptions to ensure that an exception for one commit does not
// prevent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to fix format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

synchronized (this) {
if (!prepareForSend(id, pendingRequest)) {
if (isShutdown) {
pendingRequest.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, didn't mean to remove that. I added more test coverage. It didn't matter before since the trySend would just fail and after shutdown things in pending didn't make a difference. But still good to fix.

if (hasPendingRequests()) {
protected synchronized void sendHealthCheck() throws WindmillStreamShutdownException {
if (currentPhysicalStream != null && currentPhysicalStream.hasPendingRequests()) {
trySend(HEALTH_CHECK_REQUEST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to new code: will it be better to send health checks even when there are no pending requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These custom health-checks were more to make sure we werent' stuck waiting for responses on broken streams. Sending all the time could help detect errors on streams that are idle, reducing latency if the stream does need reconnecting and we wait until a request is sent. But for cases with load I don't think it woudl occur and always sending could increase the service load since I think we currently have a lot of idle streams (such as GetData that is never used). So I'd rather not change it unless we have more motivation.

scwhittle added 5 commits July 1, 2025 20:17
…l stream from physical streams.

Currently this should be equivalent behavior but the separate physical stream handlers will allow for multiple distinct physical substreams when we want to cleanly terminate streams to avoid deadlines.

Add additional tests for the reconnection on stream errors (which already occurs). These could be committed separately if desired.

While at it change the deadline used for get data parsing timeout to be consistent with the deadline used for the response observers.  Currently this won't change much but as we use a longer deadline for direct streams and update that in the future we will avoid these spurious errors.
@scwhittle
Copy link
Contributor Author

Run Java PreCommit

1 similar comment
@scwhittle
Copy link
Contributor Author

Run Java PreCommit

@scwhittle scwhittle merged commit 13e5921 into apache:master Jul 2, 2025
14 of 15 checks passed
@scwhittle scwhittle deleted the substream branch July 2, 2025 19:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants