apollo_propeller: add concurrent streams support to handler#11072
Conversation
7d695a4 to
f2ab557
Compare
5fd25d2 to
6f85c8a
Compare
f2ab557 to
c27738e
Compare
sirandreww-starkware
left a comment
There was a problem hiding this comment.
@sirandreww-starkware made 13 comments.
Reviewable status: 0 of 1 files reviewed, 13 unresolved discussions (waiting on @noamsp-starkware and @ShahakShama).
crates/apollo_propeller/src/handler.rs line 46 at r1 (raw file):
Previously, ShahakShama wrote…
Add the words "and limiting the number of concurrent streams through yamux config"
Done.
crates/apollo_propeller/src/handler.rs line 145 at r1 (raw file):
Previously, ShahakShama wrote…
Exactly
Done.
crates/apollo_propeller/src/handler.rs line 103 at r7 (raw file):
Previously, ShahakShama wrote…
Is it possible to collect straight to
[_; CONCURRENT_STREAMS]?
Done.
crates/apollo_propeller/src/handler.rs line 139 at r7 (raw file):
Previously, ShahakShama wrote…
Comment what does None mean and why do you break
Done.
crates/apollo_propeller/src/handler.rs line 159 at r7 (raw file):
Previously, ShahakShama wrote…
add
use tracing::traceat the start of the file
Done.
crates/apollo_propeller/src/handler.rs line 185 at r7 (raw file):
Previously, ShahakShama wrote…
e -> err
Done.
crates/apollo_propeller/src/handler.rs line 300 at r7 (raw file):
Previously, ShahakShama wrote…
There are 2 ways to de-nest the code (remove tabs), early return and extract to a helper function
You chose early return here.
I actually think helper function will be a better choice here
Done.
crates/apollo_propeller/src/handler.rs line 305 at r7 (raw file):
Previously, ShahakShama wrote…
extract the active fields from the match above instead of panicking
not sure this remains relevant after extracting a function here
crates/apollo_propeller/src/handler.rs line 311 at r7 (raw file):
Previously, ShahakShama wrote…
I think you need some more helper functions in this area too
I'm not reviewing the rest so that I can review it after it is readable
this code did not change much from the previous PR in the stack. extracting a function should be done in another PR.
crates/apollo_propeller/src/handler.rs line 470 at r7 (raw file):
Previously, ShahakShama wrote…
I don't like this
I've been thinking, maybe instead of this complex mechanism we just store the active substreams and a counter for number of pending upgrades? This way, here you'll just decrease the number by one and in poll if the number is below the target by x you create x pending upgrades?
Done.
crates/apollo_propeller/src/handler.rs line 471 at r7 (raw file):
Previously, ShahakShama wrote…
You need to somehow trigger poll (maybe with a waker) so that it will try to create a new substream
Done.
ShahakShama
left a comment
There was a problem hiding this comment.
@ShahakShama reviewed 1 file and all commit messages, made 5 comments, and resolved 9 discussions.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on noamsp-starkware and sirandreww-starkware).
crates/apollo_propeller/src/handler.rs line 305 at r7 (raw file):
Previously, sirandreww-starkware (Andrew Luka) wrote…
not sure this remains relevant after extracting a function here
It is. Change the function from receiving OutboundSubstreamState to receiving the type that OutboundSubstreamState::Active has (after you accept my comment above)
crates/apollo_propeller/src/handler.rs line 311 at r7 (raw file):
Previously, sirandreww-starkware (Andrew Luka) wrote…
this code did not change much from the previous PR in the stack. extracting a function should be done in another PR.
Add a TODO then
Also, this case also uses early-return where it should use helper functions instead
crates/apollo_propeller/src/handler.rs line 304 at r8 (raw file):
} if let Some(event) = Self::poll_active_outbound_substream(
Move this to the body of the match arm of Active above, and erase some of the continue statements in the other match arms
crates/apollo_propeller/src/handler.rs line 343 at r8 (raw file):
if send_queue.is_empty() { if should_flush { match Sink::poll_flush(Pin::new(&mut substream), cx) {
You're creating a new future every poll instead of polling the existing future. You should hold the future of the flush instead of holding should_flush inside the state
This is true for poll_ready too
I'm ok with TODO for now
6f85c8a to
da097ef
Compare
sirandreww-starkware
left a comment
There was a problem hiding this comment.
@sirandreww-starkware made 4 comments.
Reviewable status: 0 of 1 files reviewed, 4 unresolved discussions (waiting on noamsp-starkware and ShahakShama).
crates/apollo_propeller/src/handler.rs line 305 at r7 (raw file):
Previously, ShahakShama wrote…
It is. Change the function from receiving OutboundSubstreamState to receiving the type that OutboundSubstreamState::Active has (after you accept my comment above)
I can't really do that, I need to be able to replace the state in its entirely
crates/apollo_propeller/src/handler.rs line 311 at r7 (raw file):
Previously, ShahakShama wrote…
Add a TODO then
Also, this case also uses early-return where it should use helper functions instead
Done.
crates/apollo_propeller/src/handler.rs line 304 at r8 (raw file):
Previously, ShahakShama wrote…
Move this to the body of the match arm of Active above, and erase some of the continue statements in the other match arms
Done.
crates/apollo_propeller/src/handler.rs line 343 at r8 (raw file):
Previously, ShahakShama wrote…
You're creating a new future every poll instead of polling the existing future. You should hold the future of the flush instead of holding should_flush inside the state
This is true for poll_ready too
I'm ok with TODO for now
This is done in gossipsub, I'm not sure it's a problem https://github.com/libp2p/rust-libp2p/blob/55e775a95b92438dece8b945ebcbab5c88a692db/protocols/gossipsub/src/handler.rs#L302
c27738e to
27fc3d4
Compare
da097ef to
bfe9644
Compare
27fc3d4 to
deb81c6
Compare
bfe9644 to
5a11585
Compare
ShahakShama
left a comment
There was a problem hiding this comment.
@ShahakShama reviewed 1 file and all commit messages, made 1 comment, and resolved 3 discussions.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on noamsp-starkware and sirandreww-starkware).
crates/apollo_propeller/src/handler.rs line 305 at r7 (raw file):
Previously, sirandreww-starkware (Andrew Luka) wrote…
I can't really do that, I need to be able to replace the state in its entirely
Then describe that in a comment on top of the unreachable! statement
5a11585 to
6e2db0d
Compare
deb81c6 to
80640d5
Compare
sirandreww-starkware
left a comment
There was a problem hiding this comment.
@sirandreww-starkware made 1 comment and resolved 1 discussion.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on noamsp-starkware and ShahakShama).
crates/apollo_propeller/src/handler.rs line 305 at r7 (raw file):
Previously, ShahakShama wrote…
Then describe that in a comment on top of the unreachable! statement
Done.
ShahakShama
left a comment
There was a problem hiding this comment.
@ShahakShama reviewed 1 file and all commit messages, made 1 comment, and resolved 1 discussion.
Reviewable status:complete! all files reviewed, all discussions resolved (waiting on noamsp-starkware).
6e2db0d to
22d1677
Compare
Merge activity
|

Note
Medium Risk
Touches core libp2p handler state machines for stream negotiation, polling, and error handling; subtle indexing/state bugs could cause message loss, stalled sends, or incorrect substream resets even though concurrency is currently set to 1.
Overview
Updates the Propeller
ConnectionHandlerto support multiple long-lived substream slots by switching inbound/outbound state from single values to fixed-size arrays keyed by anOutboundOpenInfo = usizeindex (currentlyCONCURRENT_STREAMS = 1).Inbound negotiation now claims the first free slot (or rejects extra inbound streams), and polling is refactored into per-substream helpers that iterate all inbound slots and emit received units through a passed event queue.
Outbound negotiation requests are now issued per-slot, outbound send/flush logic is extracted into
poll_active_outbound_substream, andDialUpgradeErrorhandling resets only the failed pending slot instead of wiping all outbound state; logging is also adjusted (moretrace/error).Written by Cursor Bugbot for commit 22d1677. This will update automatically on new commits. Configure here.