Skip to content

Conversation

teor2345
Copy link

@teor2345 teor2345 commented Apr 25, 2025

Description

In the kad substream handler, outbound substreams have a 10s timeout, but inbound substreams don't have any timeout.

This results in large numbers of warnings under specific heavy load conditions, which we have encountered in subspace:

2025-04-08T06:24:27.293722Z WARN Consensus: libp2p_kad::handler: New inbound substream to peer exceeds inbound substream limit. No older substream waiting to be reused. Dropping new substream. peer=PeerId("12D3KooWN6kFp2Ev181UGq3BUDfk1jfjaNu6sDTqxCZUBpmp8kRQ")

After this fix, if a substream times out, it is dropped from the set. The existing reusable substream behaviour is preserved: idle substreams are replaced when the substream limit is reached.

Fixes #5981

Cleanups

The substreams are pinned during iteration, so we can't remove them from the set. But we can replace them with a new substream. Since reusable streams are replaced directly, the Canceled state is no longer needed.

This PR also includes an upgrade to futures-bounded 0.3.0, in a separate commit.

Notes & open questions

Should the substream be closed on a timeout?
The existing code doesn't close them on (most) substream errors, so this PR handles timeouts the same way, by dropping the substream without closing it.

I have been unable to replicate the specific conditions leading to this bug in a test or my local subspace node, but we've confirmed the fix works on multiple nodes in the subspace network.

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

@teor2345 teor2345 changed the title fix(kad): enforce an inbound substream timeout in the kad substream handler fix(kad): enforce a timeout for inbound substreams Apr 25, 2025
@teor2345 teor2345 force-pushed the kad-in-subst-timeout-upstream branch from 925b8ac to 36ce9b5 Compare April 28, 2025 23:05
@teor2345

This comment was marked as resolved.

@elenaf9
Copy link
Member

elenaf9 commented Apr 30, 2025

Thank you for the PR @teor2345!

For outbound substreams, the timeout is implemented by using the futures_bounded::FuturesTupleSet (or just futures_bounded::FuturesSet) that limits the number of futures in the set and implements a timeout for each individual future.
Can we not just use that for inbound substreams as well?

That would also match the implementation in other protocols, as you already stated in #5981:

Here is how other protocols implement matching inbound and outbound timeouts:

inbound_workers: futures_bounded::FuturesSet::new(

@teor2345
Copy link
Author

teor2345 commented May 1, 2025

For outbound substreams, the timeout is implemented by using the futures_bounded::FuturesTupleSet (or just futures_bounded::FuturesSet) that limits the number of futures in the set and implements a timeout for each individual future. Can we not just use that for inbound substreams as well?

Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:

impl futures::Stream for InboundSubstreamState {
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;

It would be possible to hold the streams in a FuturesTupleSet by calling StreamExt::into_future() on them.

But we can't iterate through a FuturesTupleSet to find reusable inbound substreams. Which makes this inbound substream reuse code difficult to implement:

if self.inbound_substreams.len() == MAX_NUM_STREAMS {
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
matches!(
s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage {
first_request_timeout: None,
..
}
)
}) {
*s = InboundSubstreamState::Cancelled;
tracing::debug!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
Removed older substream waiting to be reused."
)
} else {
tracing::warn!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
No older substream waiting to be reused. Dropping new substream."
);
return;
}
}

So the only solution I could find is to add a timeout field to some of the inbound substream states. (Cancelled and Poisoned never need timeouts, and WaitingMessage only needs a timeout for the first request. So we save a bit on timers there.)

The underlying issue is that the code couples the state of the substream with the stream of items from it. A refactor could put KadInStreamSink::into_future() into a FuturesTupleSet, and store the rest of the state as the associated data. We'd also need a separate list of substreams which can/can't be reused for the substream reuse check. And a way of dropping a substream from the FuturesTupleSet when it gets reused, probably via a oneshot.

Is this a change that would be acceptable in a bug fix? Particularly one that other users might want backported?

@dariusc93
Copy link
Member

dariusc93 commented May 1, 2025

Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:

Could we not use StreamSet from futures-bounded?

EDIT: Or maybe poll the stream in the future for the set?

@teor2345
Copy link
Author

teor2345 commented May 2, 2025

Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:

Could we not use StreamSet from futures-bounded?

EDIT: Or maybe poll the stream in the future for the set?

Sure, but that doesn't deal with substream reuse, because the futures_bounded types do not allow iteration to find reusable substreams (they're a wrapper for SelectAll).

There are two pieces of functionality that this code needs:

  • work out when a substream becomes available for reuse
  • when a new substream is created, and the substream limit has been reached, replace a reusable substream with the new substream (or drop the new substream if no substreams are reusable)

Here's one possible way to implement that:

  • use a futures_bounded::StreamSet to poll the next item from each substream
  • on a timeout in the WaitingMessage state, mark the substream as available for reuse using a oneshot (in any other state, the substream can't be used further, so just end the stream and it will get dropped)
  • when a new substream is created, find a substream that's marked for reuse, and replace the original substream with the new substream

This will involve some quite weird types, like Oneshot<Oneshot<InboundSubstreamState>>. The outer Oneshot is for substream reuse availability, and the inner is for sending back the replacement substream. But it should work, and the changes might be less complicated than the existing PR.

Is this a change that would be acceptable in a bug fix? Particularly one that other users might want backported?

I also can't guarantee I'll have time to work on this any time soon, because the current PR code works, and fixes our downstream bug.

@elenaf9
Copy link
Member

elenaf9 commented May 5, 2025

Thank you for the follow-ups and detailed explanation @teor2345.

Sure, but that doesn't deal with substream reuse, because the futures_bounded types do not allow iteration to find reusable substreams (they're a wrapper for SelectAll).

Opened thomaseizinger/rust-futures-bounded#8 to see if we can implement iterator for StreamSet. If that PR won't be merged then I'd go with your current solution.

mergify bot pushed a commit that referenced this pull request Jun 26, 2025
#6015 made the timeout for outbound substreams in kademlia configurable by introducing a `outbound_substreams_timeout` config option.
There is currently an open PR #6009 that plans to also apply that timeout to inbound substream, in which case we probably want toe rename the config option to `substreams_timeout` (without the prefix).
Because we plan to release soon and might not get #6009 in in-time, I propose that we already do the renaming now, to avoid breaking the API again with the next release.

Pull-Request: #6076.
@jxs
Copy link
Member

jxs commented Jul 22, 2025

friendly ping @teor2345, I saw thomaseizinger/rust-futures-bounded#10 (comment) but just to confirm, do you plan on continuing this?
Cheers!

@teor2345
Copy link
Author

friendly ping @teor2345, I saw thomaseizinger/rust-futures-bounded#10 (comment) but just to confirm, do you plan on continuing this? Cheers!

I think that thomaseizinger/rust-futures-bounded#10 will be a better alternative once it's released.
But we're also waiting on thomaseizinger/rust-futures-bounded#12 to merge.

Copy link
Contributor

mergify bot commented Jul 29, 2025

This pull request has merge conflicts. Could you please resolve them @teor2345? 🙏

@teor2345 teor2345 force-pushed the kad-in-subst-timeout-upstream branch from 5377558 to 5fd5603 Compare July 29, 2025 03:54
@teor2345 teor2345 marked this pull request as draft July 29, 2025 03:55
@teor2345 teor2345 marked this pull request as ready for review July 29, 2025 03:57
@teor2345 teor2345 marked this pull request as draft July 29, 2025 03:57
Copy link
Author

@teor2345 teor2345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs @elenaf9 this is now ready for review again!

Most of the changes are for the futures-bounded 0.3.0 upgrade, happy to split them out into a separate PR if that would help.

I've marked it as draft because the kad/StreamSet changes depend on an unreleased version of futures-bounded (they are waiting for us to test these changes).

I can test this branch with subspace, but are there any other tests you'd like to do?

This pull request has merge conflicts. Could you please resolve them @teor2345? 🙏

It really doesn't 🤣

)
})
{
*s.get_mut() = new_substream;
Copy link
Author

@teor2345 teor2345 Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because StreamSet only allows try_push(), we can't go over the stream limit, even temporarily. So the new stream has to replace the old one immediately. This also means we don't need the Canceled state.

This code is more robust, because old code could have subtle bugs. For example, if we added two streams over the limit in a row (so we had 33 streams), the equality check (== 32) would fail, and allow us to add even more streams over the 32 limit.

if self.inbound_substreams.len() >= MAX_NUM_STREAMS {
if let Some(s) = self
.inbound_substreams
.iter_mut_of_type::<InboundSubstreamState>()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This weird construct is required because the underlying API (in SelectAll) doesn't allow typed iteration.

Comment on lines -766 to 783
if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
if let Poll::Ready(Some(event_result)) = self.inbound_substreams.poll_next_unpin(cx) {
match event_result {
Ok(event) => return Poll::Ready(event),
Err(_stream_set_timeout) => {
tracing::trace!(
"Inbound substream timed out waiting for peer, send, or close"
);
continue;
}
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what to do with an inbound substream timeout.
We could:

  • explicitly close the substream
  • return some synthetic inbound timeout event
  • something else???

@teor2345 teor2345 marked this pull request as ready for review July 30, 2025 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kad exceeds substream limit due to outbound timeout, but no inbound timeout
4 participants