-
Notifications
You must be signed in to change notification settings - Fork 0
Concurrently drain streaming fibers #552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Free ⛔ Files ignored due to path filters (2)
📒 Files selected for processing (9)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (6)
Note 🎁 Summarized by CodeRabbit FreeYour organization is on the Free plan. CodeRabbit will generate a high-level summary and a walkthrough for each pull request. For a comprehensive line-by-line review, please upgrade your subscription to CodeRabbit Pro by visiting https://app.coderabbit.ai/login. Comment |
|
|
||
| Sync do |parent| | ||
| queue = Async::Queue.new | ||
| semaphore = Async::Semaphore.new(64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need an integer config flag for this? instead of it being default and the hardcoded 64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please do. We are not yet sure if it will impact performance or memory usage. So, please find an appropriate default value. However, since this may vary depending on the project, it would be best to make it configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the user passes a capacity value below 1, should we coerce to 1 (strict backpressure, but implicit) or fall back to our default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or raise a config error?
| semaphore.acquire { queue.enqueue([idx, chunk]) } | ||
| end | ||
| rescue StandardError => e | ||
| error_msg = "<!-- stream error: #{e.class}: #{e.message} -->" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original implementation does not handle errors that occur in this section. Errors should indeed occur here unless there is a bug in React on Rails or RORP. I believe it is better to throw an error rather than fail silently or return sensitive data to the client in chunks. It's important to note that in production, the server does not display error data to the client, as it may contain sensitive information. I am open to any other suggestions you may have for handling these errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i'll leave this to propagate up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally, when doing concurrent work like here, if an error occur in one component, should we somehow continue rendering others or close the stream?
| begin | ||
| chunk = fiber.resume | ||
| rescue FiberError | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It fails silently here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not an error we're silencing. it's handling already drained fibers after the first resume done in stream_react_component.
did i get it wrong?
AbanoubGhadban
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent implementation. I only have some small comments.
|
|
||
| Sync do |parent| | ||
| queue = Async::Queue.new | ||
| capacity = ReactOnRailsPro.configuration.concurrent_stream_queue_capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also remove this config and accept it as a parameter to the stream_view_containing_react_components function. @Judahmeek do you agree with me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capacity issues, such as worker threads, server_rendering_pools, etc, usually are decided in the configuration.
That said, if it makes testing easier, then having this being a parameter for this function is a good idea.
| ReactOnRailsPro.configuration.concurrent_stream_drain = original | ||
| end | ||
|
|
||
| it "gates by config (sequential vs concurrent)" do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what you mean by "gates by"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"conditionally" stream concurrently or sequentially. we're testing if the config flag is working
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this wouldn't mean anything anymore if you and @Judahmeek decide on making concurrency the default and only option for streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any benefit, performance or otherwise, to sequential streaming?
8c3b2e9 to
b70a94f
Compare
b70a94f to
08f937a
Compare
a580626 to
8ae95e1
Compare
…ogs; behind config flag
- handle when a component fiber is already drained after the first resume (the shell). - typically, streamed components will have more chunks besides the first chunk. - this is an edge case exposed while testing. - we're basically avoiding getting a FiberError: attempt to resume a terminated fiber
…, edge cases, and producer error
…ming backpressure
…start writer before producers; remove per-producer done markers
- propagate runtime errors instead of silencing them - keep gracefully handling already terminated fibers from resuming - properly handle cleaning/shutting down when errors occur
replace custom test doubles with RSpec's instance_double for cleaner and more maintainable test isolation. This approach follows established mocking patterns and removes dependency on custom streaming buffer implementations.
…onents concurrently
This reverts commit 528ee15.
…ing" This reverts commit 41f5eb9.
…t_streaming_buffer_size for improved memory management and validation
…roved clarity and performance
…doesn't expect a :size keyword argument
The exact version 15.0.0 was yanked from RubyGems, causing CI failures. Updated to 16.0.1.rc.4 which is available and stable.
Keep JavaScript dependencies in sync with Ruby Gemfile versions
…on_rails 16.0 The using_packer? method was removed in react_on_rails 16.0 as part of the transition to Shakapacker-only support. Since Shakapacker is now assumed to always be in use, the check is no longer needed. Changes: - Remove using_packer? check from Utils.bundle_file_name method - Remove mock of non-existent method from spec - Update comments to reference Shakapacker instead of webpacker Fixes CI failures in utils_spec.rb
2d4c790 to
c27d381
Compare
Implements #550 - Concurrent draining of streamed React components (Async tasks + single writer) - Default backpressure via Async::Semaphore; handle client disconnects - Specs for sequential vs concurrent, per-component ordering, edge cases, and backpressure Example: - With two stream components fibers, chunks interleave (first-ready wins) while each component's own chunk order is preserved. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Improvements** * Concurrent streaming for React components replaces sequential processing, reducing latency and adding memory‑bounded buffering for steadier throughput. * **Added** * New configuration: concurrent_component_streaming_buffer_size (default: 64) to tune memory vs. performance. * Runtime dependency on the async gem (>= 2.6) to enable concurrent streaming. * **Documentation** * Updated inline references to Shakapacker terminology. * **Tests** * Expanded coverage for concurrent streaming, ordering, backpressure, and disconnect scenarios. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --- **Migrated from**: shakacode/react_on_rails_pro#552 <!-- Reviewable:start --> - - - This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/shakacode/react_on_rails/2015) <!-- Reviewable:end --> --------- Co-authored-by: Abanoub Ghadban <[email protected]> Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Ihab Adham <[email protected]>
Implements #550
Example:
Summary by CodeRabbit
New Features
Improved
Changed
Chores
Tests
Documentation