refactor(sinktools): LazySinkSource cleanup waker handling, use StreamExt::split, improve unit tests, fix #2336#2556
refactor(sinktools): LazySinkSource cleanup waker handling, use StreamExt::split, improve unit tests, fix #2336#2556MingweiSamuel wants to merge 3 commits intomainfrom
LazySinkSource cleanup waker handling, use StreamExt::split, improve unit tests, fix #2336#2556Conversation
LazySinkSource cleanup waker handling, improve unit testsLazySinkSource cleanup waker handling, improve unit tests, fix #2336
LazySinkSource cleanup waker handling, improve unit tests, fix #2336LazySinkSource cleanup waker handling, use StreamExt::split, improve unit tests, fix #2336
Deploying hydro with
|
| Latest commit: |
7f8c10f
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://5546180a.hydroflow.pages.dev |
| Branch Preview URL: | https://lazy-1.hydroflow.pages.dev |
There was a problem hiding this comment.
Pull request overview
Refactors sinktools’ LazySinkSource to simplify/optimize wakeup handling (addressing #2336) and modernize splitting by relying on StreamExt::split, alongside strengthening initialization-driving test coverage.
Changes:
- Replace the old multi-waker approach with a 2-slot
DualWakerbased onfutures_util::task::AtomicWaker. - Refactor
LazySinkSourceto hold state directly (removingRc<RefCell<...>>) and remove the custom split-half types in favor ofStreamExt::split. - Add/adjust unit tests for initialization behavior; update dev-deps to enable
tokio::time.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| sinktools/src/lazy_sink_source.rs | Refactors internal state + wakeup handling; switches to StreamExt::split; expands unit tests. |
| sinktools/Cargo.toml | Enables tokio’s time feature for dev-dependencies to support new tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| dual_waker.stream.register(cx.waker()); | ||
| let waker = Waker::from(Arc::clone(dual_waker)); | ||
|
|
||
| let mut new_context = Context::from_waker(&waker); | ||
|
|
There was a problem hiding this comment.
Each poll clones the Arc<DualWaker> to build a new Waker (Waker::from(Arc::clone(...))). If this is on a hot path, consider using futures_util::task::waker_ref (or similar) to avoid the extra Arc refcount bump per poll.
| panic!("LazySinkHalf in invalid state."); | ||
| } |
There was a problem hiding this comment.
This panic message still refers to LazySinkHalf, but the half types were removed and the trait impl is now on LazySinkSource. Update the panic text to match the current type to avoid confusing debugging output.
| panic!("LazySinkHalf not ready."); | ||
| } |
There was a problem hiding this comment.
This panic message still refers to LazySinkHalf, but the half types were removed and the trait impl is now on LazySinkSource. Update the panic text to match the current type to avoid confusing debugging output.
| panic!("LazySinkHalf in invalid state."); | ||
| } |
There was a problem hiding this comment.
This panic message still refers to LazySinkHalf, but the half types were removed and the trait impl is now on LazySinkSource. Update the panic text to match the current type to avoid confusing debugging output.
| panic!("LazySinkHalf in invalid state."); | ||
| } |
There was a problem hiding this comment.
This panic message still refers to LazySinkHalf, but the half types were removed and the trait impl is now on LazySinkSource. Update the panic text to match the current type to avoid confusing debugging output.
| .run_until(async { | ||
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
| let addr = listener.local_addr().unwrap(); | ||
| println!("Listening on {}", addr); |
There was a problem hiding this comment.
Avoid println! in tests committed to the repo; it adds noise to CI logs. Prefer tracing (with test subscriber) or remove the output unless it’s needed for an assertion.
| println!("Listening on {}", addr); |
| .run_until(async { | ||
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
| let addr = listener.local_addr().unwrap(); | ||
| println!("Listening on {}", addr); |
There was a problem hiding this comment.
Avoid println! in tests committed to the repo; it adds noise to CI logs. Prefer tracing (with test subscriber) or remove the output unless it’s needed for an assertion.
| println!("Listening on {}", addr); |
| dual_waker.sink.register(cx.waker()); | ||
| let waker = Waker::from(Arc::clone(dual_waker)); | ||
|
|
||
| let mut new_context = Context::from_waker(&waker); |
There was a problem hiding this comment.
Each poll clones the Arc<DualWaker> to build a new Waker (Waker::from(Arc::clone(...))). If this is on a hot path, consider using futures_util::task::waker_ref (or similar) to avoid the extra Arc refcount bump per poll.
Fix #2336