Skip to content

Commit 5fd5603

Browse files
committed
Enforce a timeout on inbound kad substreams using StreamSet
1 parent 3820009 commit 5fd5603

File tree

3 files changed

+57
-41
lines changed

3 files changed

+57
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ libp2p-yamux = { version = "0.47.0", path = "muxers/yamux" }
121121
asynchronous-codec = { version = "0.7.0" }
122122
env_logger = "0.11"
123123
futures = "0.3.30"
124-
futures-bounded = { version = "0.3.0", features = ["futures-timer"] }
124+
# TODO: replace with version = "0.3.1" once released upstream
125+
futures-bounded = { git = "https://github.com/thomaseizinger/rust-futures-bounded", rev = "012803d343b5c604e65d3c238a8cd7a145616447", features = ["futures-timer"] }
125126
futures-rustls = { version = "0.26.0", default-features = false }
126127
getrandom = "0.2"
127128
if-watch = "3.2.1"

protocols/kad/src/handler.rs

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use std::{
2727
};
2828

2929
use either::Either;
30-
use futures::{channel::oneshot, prelude::*, stream::SelectAll};
31-
use futures_bounded::Delay;
30+
use futures::{channel::oneshot, prelude::*};
31+
use futures_bounded::{Delay, StreamSet};
3232
use libp2p_core::{upgrade, ConnectedPoint};
3333
use libp2p_identity::PeerId;
3434
use libp2p_swarm::{
@@ -78,7 +78,8 @@ pub struct Handler {
7878
pending_messages: VecDeque<(KadRequestMsg, QueryId)>,
7979

8080
/// List of active inbound substreams with the state they are in.
81-
inbound_substreams: SelectAll<InboundSubstreamState>,
81+
/// The streams are typed `InboundSubstreamState`, but the set uses the item type.
82+
inbound_substreams: StreamSet<ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>>,
8283

8384
/// The connected endpoint of the connection that the handler
8485
/// is associated with.
@@ -120,8 +121,6 @@ enum InboundSubstreamState {
120121
PendingFlush(UniqueConnecId, KadInStreamSink<Stream>),
121122
/// The substream is being closed.
122123
Closing(KadInStreamSink<Stream>),
123-
/// The substream was cancelled in favor of a new one.
124-
Cancelled,
125124

126125
Poisoned {
127126
phantom: PhantomData<QueryId>,
@@ -174,9 +173,6 @@ impl InboundSubstreamState {
174173
| InboundSubstreamState::Closing(substream) => {
175174
*self = InboundSubstreamState::Closing(substream);
176175
}
177-
InboundSubstreamState::Cancelled => {
178-
*self = InboundSubstreamState::Cancelled;
179-
}
180176
InboundSubstreamState::Poisoned { .. } => unreachable!(),
181177
}
182178
}
@@ -462,7 +458,10 @@ impl Handler {
462458
endpoint,
463459
remote_peer_id,
464460
next_connec_unique_id: UniqueConnecId(0),
465-
inbound_substreams: Default::default(),
461+
inbound_substreams: StreamSet::new(
462+
move || Delay::futures_timer(substreams_timeout),
463+
MAX_NUM_STREAMS,
464+
),
466465
outbound_substreams: futures_bounded::FuturesTupleSet::new(
467466
move || Delay::futures_timer(substreams_timeout),
468467
MAX_NUM_STREAMS,
@@ -519,38 +518,45 @@ impl Handler {
519518
});
520519
}
521520

522-
if self.inbound_substreams.len() == MAX_NUM_STREAMS {
523-
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
524-
matches!(
525-
s,
526-
// An inbound substream waiting to be reused.
527-
InboundSubstreamState::WaitingMessage { first: false, .. }
528-
)
529-
}) {
530-
*s = InboundSubstreamState::Cancelled;
521+
let connec_unique_id = self.next_connec_unique_id;
522+
self.next_connec_unique_id.0 += 1;
523+
let new_substream = InboundSubstreamState::WaitingMessage {
524+
first: true,
525+
connection_id: connec_unique_id,
526+
substream: protocol,
527+
};
528+
529+
if self.inbound_substreams.len() >= MAX_NUM_STREAMS {
530+
if let Some(s) = self
531+
.inbound_substreams
532+
.iter_mut_of_type::<InboundSubstreamState>()
533+
.find(|s| {
534+
matches!(
535+
**s,
536+
// An inbound substream waiting to be reused.
537+
InboundSubstreamState::WaitingMessage { first: false, .. }
538+
)
539+
})
540+
{
541+
*s.get_mut() = new_substream;
531542
tracing::debug!(
532543
peer=?self.remote_peer_id,
533544
"New inbound substream to peer exceeds inbound substream limit. \
534-
Removed older substream waiting to be reused."
545+
Replacing older substream that was waiting to be reused."
535546
)
536547
} else {
537548
tracing::warn!(
538549
peer=?self.remote_peer_id,
539550
"New inbound substream to peer exceeds inbound substream limit. \
540551
No older substream waiting to be reused. Dropping new substream."
541552
);
542-
return;
543553
}
554+
} else {
555+
self.inbound_substreams
556+
.try_push(new_substream)
557+
.map_err(|_| ())
558+
.expect("Just checked that stream set is not full; qed");
544559
}
545-
546-
let connec_unique_id = self.next_connec_unique_id;
547-
self.next_connec_unique_id.0 += 1;
548-
self.inbound_substreams
549-
.push(InboundSubstreamState::WaitingMessage {
550-
first: true,
551-
connection_id: connec_unique_id,
552-
substream: protocol,
553-
});
554560
}
555561

556562
/// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol
@@ -617,15 +623,15 @@ impl ConnectionHandler for Handler {
617623
HandlerIn::Reset(request_id) => {
618624
if let Some(state) = self
619625
.inbound_substreams
620-
.iter_mut()
621-
.find(|state| match state {
626+
.iter_mut_of_type::<InboundSubstreamState>()
627+
.find(|state| match **state {
622628
InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => {
623-
conn_id == &request_id.connec_unique_id
629+
conn_id == request_id.connec_unique_id
624630
}
625631
_ => false,
626632
})
627633
{
628-
state.close();
634+
state.get_mut().close();
629635
}
630636
}
631637
HandlerIn::FindNodeReq { key, query_id } => {
@@ -764,8 +770,16 @@ impl ConnectionHandler for Handler {
764770
Poll::Pending => {}
765771
}
766772

767-
if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
768-
return Poll::Ready(event);
773+
if let Poll::Ready(Some(event_result)) = self.inbound_substreams.poll_next_unpin(cx) {
774+
match event_result {
775+
Ok(event) => return Poll::Ready(event),
776+
Err(_stream_set_timeout) => {
777+
tracing::trace!(
778+
"Inbound substream timed out waiting for peer, send, or close"
779+
);
780+
continue;
781+
}
782+
}
769783
}
770784

771785
if self.outbound_substreams.len() < MAX_NUM_STREAMS {
@@ -849,8 +863,11 @@ fn compute_new_protocol_status(
849863

850864
impl Handler {
851865
fn answer_pending_request(&mut self, request_id: RequestId, mut msg: KadResponseMsg) {
852-
for state in self.inbound_substreams.iter_mut() {
853-
match state.try_answer_with(request_id, msg) {
866+
for state in self
867+
.inbound_substreams
868+
.iter_mut_of_type::<InboundSubstreamState>()
869+
{
870+
match state.get_mut().try_answer_with(request_id, msg) {
854871
Ok(()) => return,
855872
Err(m) => {
856873
msg = m;
@@ -1007,7 +1024,6 @@ impl futures::Stream for InboundSubstreamState {
10071024
}
10081025
},
10091026
InboundSubstreamState::Poisoned { .. } => unreachable!(),
1010-
InboundSubstreamState::Cancelled => return Poll::Ready(None),
10111027
}
10121028
}
10131029
}

0 commit comments

Comments
 (0)