Skip to content

[Java Dataflow Streaming] Add support to AbstractWindmillStream to transition between physical streams within the same logical stream#35523

Merged
scwhittle merged 14 commits intoapache:masterfrom
scwhittle:substream_handover
Aug 11, 2025
Merged

Conversation

@scwhittle
Copy link
Contributor


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@scwhittle
Copy link
Contributor Author

R: @arunpandianp @m-trieu

Sending out for some comments since I'm going to be OOO for a little bit. I still need to add a lot of tests but the simple functionality test for GetDataStream passes.

@github-actions
Copy link
Contributor

github-actions bot commented Jul 3, 2025

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

// XXX figure out
}
try {
@NonNull PhysicalStreamHandler streamHandler = newResponseHandler();
Copy link
Contributor

@arunpandianp arunpandianp Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call startStream() here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to do it in the synchronized block. we also want to do it after the handling of the error in case that will trigger new messages to be sent on the new stream.

Comment on lines +540 to +544
clearPhysicalStreamForDebug();
currentPhysicalStream = null;
if (halfCloseFuture != null) {
halfCloseFuture.cancel(false);
halfCloseFuture = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these steps are repeated in a few places, consider moving them to a function like clearPhysicalStream()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@scwhittle
Copy link
Contributor Author

R: @arunpandianp If you could take another look that would be great. I have finished with the GrpcGetDataStream tests. I would like to add more tests for the CommitStream still

@scwhittle scwhittle changed the title WIP Add support to AbstractWindmillStream to transition between physical streams within the same logical stream [Java Dataflow Streaming] Add support to AbstractWindmillStream to transition between physical streams within the same logical stream Aug 6, 2025
@scwhittle scwhittle marked this pull request as ready for review August 6, 2025 13:22
@scwhittle scwhittle requested a review from arunpandianp August 6, 2025 13:22
@arunpandianp
Copy link
Contributor

ack, will take a look

@scwhittle
Copy link
Contributor Author

Thanks! I finished the commitstream tests but see now there are failures in some of the existing tests and am investigating.

@arunpandianp
Copy link
Contributor

/gemini summary

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request significantly enhances the robustness of Dataflow streaming by introducing support for dynamic physical stream transitions within a logical stream. This change allows the system to gracefully manage underlying GRPC connections, proactively cycle streams, and automatically recover from transient failures by re-flushing pending requests onto new connections, thereby improving the overall reliability and stability of long-running streaming jobs.

Highlights

  • Physical Stream Handover: Implemented a core mechanism within AbstractWindmillStream to gracefully transition between physical GRPC streams (e.g., half-closing an old connection and establishing a new one) within the same logical stream. This allows for proactive stream cycling and improved resilience for long-lived streaming connections.
  • Automatic Request Re-flushing: Enhanced the stream handling to automatically re-send any pending requests from a half-closed or failed physical stream onto the newly established physical stream, ensuring that no data is lost during transitions or transient connection issues.
  • Configurable Stream Management: Introduced new parameters in AbstractWindmillStream and GrpcWindmillStreamFactory to configure the duration after which a physical stream should be half-closed (halfClosePhysicalStreamAfter) and to provide a ScheduledExecutorService for managing these transitions, offering greater control over stream lifecycle.
  • Comprehensive Test Coverage: Added extensive new unit tests across GrpcCommitWorkStreamTest and GrpcGetDataStreamTest to validate the complex scenarios involving multiple physical streams, handovers, stream failures (both old and new streams), and proper shutdown/half-close behavior. A custom TriggeredScheduledExecutorService was introduced to enable deterministic testing of asynchronous stream events.
Changelog
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
    • Changed executor field type from ExecutorService to ScheduledExecutorService to support scheduled tasks.
    • Added new fields: halfClosePhysicalStreamAfter (Duration) for configuring stream handover timing, halfCloseFuture (Future) to manage scheduled half-close tasks, and closingPhysicalStreams (Set) to track physical streams that are in the process of closing.
    • Modified the constructor to accept halfClosePhysicalStreamAfter and ScheduledExecutorService parameters.
    • Refactored onNewStream() into onFlushPending(boolean isNewStream) to handle both initial stream setup and re-flushing of pending requests onto new streams.
    • Updated startStream() logic to schedule onHalfClosePhysicalStreamTimeout when halfClosePhysicalStreamAfter is configured, enabling proactive stream cycling.
    • Modified halfClose() and shutdown() methods to correctly manage the state of multiple physical streams and ensure all pending work is handled before full termination.
    • Removed the maybeTearDownStream helper method, integrating its logic into onPhysicalStreamCompletion.
    • Introduced completeShutdown() to centralize the final steps of stream termination.
    • Added onHalfClosePhysicalStreamTimeout to handle the scheduled half-closing of physical streams.
    • Updated onPhysicalStreamCompletion to manage closingPhysicalStreams and trigger onFlushPending for active streams.
    • Added clearCurrentPhysicalStream helper method for consistent state cleanup.
    • Overrode equals and hashCode methods in PhysicalStreamHandler to use identity-based comparisons for stream instances.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
    • Removed the default setBudget(long newItems, long newBytes) method from the GetWorkStream interface, simplifying the interface.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
    • Updated the constructor to pass the new halfClosePhysicalStreamAfter and ScheduledExecutorService parameters to the AbstractWindmillStream superclass.
    • Modified onFlushPending to prevent resending requests that are already active on physical streams that are in the process of being half-closed.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
    • Updated the constructor to pass the new halfClosePhysicalStreamAfter and ScheduledExecutorService parameters to the AbstractWindmillStream superclass.
    • Modified onFlushPending to only send initial requests when a new stream is established.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
    • Updated the constructor to pass the new halfClosePhysicalStreamAfter and ScheduledExecutorService parameters to the AbstractWindmillStream superclass.
    • Modified onFlushPending to only send initial requests when a new stream is established.
    • Enhanced shutdownInternal to ensure pending requests on all closingPhysicalStreams are properly cancelled.
    • Updated a comment in trySendBatch to reflect the new onFlushPending behavior for handling finalized batches.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
    • Updated the constructor to pass the new halfClosePhysicalStreamAfter and ScheduledExecutorService parameters to the AbstractWindmillStream superclass.
    • Modified onFlushPending to only send initial requests when a new stream is established.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java
    • Updated the constructor to pass the new halfClosePhysicalStreamAfter and ScheduledExecutorService parameters to the AbstractWindmillStream superclass.
    • Modified onFlushPending to only send initial requests when a new stream is established.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
    • Introduced DEFAULT_DIRECT_STREAMING_RPC_PHYSICAL_STREAM_HALF_CLOSE_AFTER constant for default handover duration.
    • Added a memoized DIRECT_STREAM_SCHEDULED_EXECUTOR_SERVICE (a shared thread pool) for direct streaming RPCs.
    • Added directStreamingRpcPhysicalStreamHalfCloseAfter and executor fields to the factory to manage new stream parameters.
    • Updated the factory constructor and create method to accept and propagate the new halfClosePhysicalStreamAfter and ScheduledExecutorService parameters.
    • Added setDirectStreamingRpcPhysicalStreamHalfCloseAfter and setScheduledExecutorService methods to the Builder for configuration.
    • Modified all create*Stream methods to pass the new Duration and Executor parameters to the respective stream constructors.
    • Introduced executorForDispatchedStreams and executorForDirectStreams helper methods to provide appropriate ScheduledExecutorService instances based on stream type.
    • Ensured direct stream stubs (createDirectGetWorkStream, createDirectGetDataStream, createDirectCommitWorkStream) use withDefaultDeadline.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
    • Reduced sleep/wait times in awaitTermination and waitForDroppedCommits for faster test execution and increased sensitivity to timing issues.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
    • Reduced sleep/wait times in testKeyCommitTooLargeException and testExceptions for faster test execution.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java
    • Updated TestStream constructor to accept Duration and ScheduledExecutorService to align with AbstractWindmillStream changes.
    • Modified onNewStream() to onFlushPending(boolean isNewStream) and updated its internal logic to reflect the new behavior.
    • Integrated TriggeredScheduledExecutorService into TestStream for controlled testing of scheduled events.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/TriggeredScheduledExecutorService.java
    • New file: Implements a custom ScheduledExecutorService designed for testing asynchronous operations. It allows test code to manually trigger scheduled tasks using unblockNextFuture(), providing deterministic control over timing-dependent logic.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/FakeWindmillGrpcService.java
    • Added failConnectionsAndWait method to simulate transient connection failures for testing purposes.
    • Introduced noMoreStreamsExpected and failedStreamConnectsRemaining fields to control and track expected stream connections during tests.
    • Enhanced StreamInfoObserver's onNext method to throw an IllegalStateException if requests are sent on a stream that has already been half-closed or fully closed, improving test robustness.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java
    • Added extensive new test cases (testCommitWorkItem_multiplePhysicalStreams, _OldStreamFails, _newStreamFailsWhileEmpty, _newStreamFailsWithRequests, _multipleHandovers, _shutdown, _halfClose) to thoroughly validate the new physical stream transition and request re-flushing logic under various conditions, including failures and graceful shutdowns.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java
    • Added extensive new test cases (testRequestKeyedData_multiplePhysicalStreams, _OldStreamFails, _newStreamFailsWhileEmpty, _newStreamFailsWithRequests, _multipleHandovers_allResponsesReceived, _OldStreamFailsWhileNewStreamInBackoff, _multipleHandovers_shutdown, _multipleHandovers_halfClose) to thoroughly validate the new physical stream transition and request re-flushing logic under various conditions, including failures and graceful shutdowns.
Activity
  • On July 3rd, scwhittle (author) opened the pull request and requested initial reviews from arunpandianp and m-trieu, noting that more tests were still needed.
  • The github-actions[bot] subsequently stopped reviewer notifications.
  • On August 5th, scwhittle re-requested a review from arunpandianp, indicating that GrpcGetDataStream tests were complete, but CommitStream tests were still in progress.
  • arunpandianp acknowledged the review request on August 6th.
  • On August 7th, scwhittle confirmed that CommitStream tests were finished but reported new failures in existing tests, which they were investigating.
  • During the review process, arunpandianp suggested using IdentityHashMap for stream registry, which scwhittle implemented.
  • A discussion took place regarding the placement of startStream() calls, where scwhittle provided a detailed explanation for the chosen approach.
  • arunpandianp also suggested extracting repeated code into a clearPhysicalStream() function, which scwhittle completed.

@arunpandianp
Copy link
Contributor

just a few minor comments, the logic looks good to me

@scwhittle
Copy link
Contributor Author

failure is unrelated

@scwhittle scwhittle merged commit 932f078 into apache:master Aug 11, 2025
14 of 15 checks passed
@scwhittle scwhittle deleted the substream_handover branch August 11, 2025 18:14
parveensania pushed a commit to parveensania/beam-dp that referenced this pull request Aug 17, 2025
…ansition between physical streams within the same logical stream (apache#35523)

* fix stuck get data if no current stream
* improve html
DKER2 pushed a commit to DKER2/beam that referenced this pull request Aug 20, 2025
…ansition between physical streams within the same logical stream (apache#35523)

* fix stuck get data if no current stream
* improve html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants