diff --git a/Cargo.lock b/Cargo.lock index b2244d92594..87e921a8e61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1391,9 +1391,8 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91f328e7fb845fc832912fb6a34f40cf6d1888c92f974d1893a54e97b5ff542e" +version = "0.3.0" +source = "git+https://github.com/thomaseizinger/rust-futures-bounded?rev=012803d343b5c604e65d3c238a8cd7a145616447#012803d343b5c604e65d3c238a8cd7a145616447" dependencies = [ "futures-timer", "futures-util", @@ -2721,7 +2720,7 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.49.0" +version = "0.49.1" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index c2e89a0fada..6f4e817157a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ libp2p-floodsub = { version = "0.47.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.50.0", path = "protocols/gossipsub" } libp2p-identify = { version = "0.47.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.12" } -libp2p-kad = { version = "0.49.0", path = "protocols/kad" } +libp2p-kad = { version = "0.49.1", path = "protocols/kad" } libp2p-mdns = { version = "0.48.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.5.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.17.0", path = "misc/metrics" } @@ -121,7 +121,8 @@ libp2p-yamux = { version = "0.47.0", path = "muxers/yamux" } asynchronous-codec = { version = "0.7.0" } env_logger = "0.11" futures = "0.3.30" -futures-bounded = { version = "0.2.4" } +# TODO: replace with version = "0.3.1" once released upstream +futures-bounded = { git = "https://github.com/thomaseizinger/rust-futures-bounded", rev = "012803d343b5c604e65d3c238a8cd7a145616447", features = ["futures-timer"] } futures-rustls = { version = "0.26.0", default-features = false } getrandom = "0.2" if-watch = "3.2.1" diff --git a/protocols/autonat/src/v2/client/handler/dial_back.rs b/protocols/autonat/src/v2/client/handler/dial_back.rs index 3fd3cf0b5ed..3c0c7ca9f7f 100644 --- a/protocols/autonat/src/v2/client/handler/dial_back.rs +++ b/protocols/autonat/src/v2/client/handler/dial_back.rs @@ -6,7 +6,7 @@ use std::{ }; use futures::channel::oneshot; -use futures_bounded::StreamSet; +use futures_bounded::{Delay, StreamSet}; use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_swarm::{ handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError}, @@ -22,7 +22,7 @@ pub struct Handler { impl Handler { pub(crate) fn new() -> Self { Self { - inbound: StreamSet::new(Duration::from_secs(5), 2), + inbound: StreamSet::new(|| Delay::futures_timer(Duration::from_secs(5)), 2), } } } diff --git a/protocols/autonat/src/v2/client/handler/dial_request.rs b/protocols/autonat/src/v2/client/handler/dial_request.rs index bafdd9d818c..b3dc017ad1b 100644 --- a/protocols/autonat/src/v2/client/handler/dial_request.rs +++ b/protocols/autonat/src/v2/client/handler/dial_request.rs @@ -8,7 +8,7 @@ use std::{ }; use futures::{channel::oneshot, AsyncWrite}; -use futures_bounded::FuturesMap; +use futures_bounded::{Delay, FuturesMap}; use libp2p_core::{ upgrade::{DeniedUpgrade, ReadyUpgrade}, Multiaddr, @@ -91,7 +91,7 @@ impl Handler { pub(crate) fn new() -> Self { Self { queued_events: VecDeque::new(), - outbound: FuturesMap::new(Duration::from_secs(10), 10), + outbound: FuturesMap::new(|| Delay::futures_timer(Duration::from_secs(10)), 10), queued_streams: VecDeque::default(), } } diff --git a/protocols/autonat/src/v2/server/handler/dial_back.rs b/protocols/autonat/src/v2/server/handler/dial_back.rs index 836cb50b5c3..736a9aa34b2 100644 --- a/protocols/autonat/src/v2/server/handler/dial_back.rs +++ b/protocols/autonat/src/v2/server/handler/dial_back.rs @@ -6,7 +6,7 @@ use std::{ }; use futures::{AsyncRead, AsyncWrite}; -use futures_bounded::FuturesSet; +use futures_bounded::{Delay, FuturesSet}; use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_swarm::{ handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedOutbound}, @@ -33,7 +33,7 @@ impl Handler { Self { pending_nonce: Some(cmd), requested_substream_nonce: None, - outbound: FuturesSet::new(Duration::from_secs(10), 5), + outbound: FuturesSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 5), } } } diff --git a/protocols/autonat/src/v2/server/handler/dial_request.rs b/protocols/autonat/src/v2/server/handler/dial_request.rs index 5b4318bd643..bd0a4c326b7 100644 --- a/protocols/autonat/src/v2/server/handler/dial_request.rs +++ b/protocols/autonat/src/v2/server/handler/dial_request.rs @@ -10,7 +10,7 @@ use futures::{ channel::{mpsc, oneshot}, AsyncRead, AsyncWrite, SinkExt, StreamExt, }; -use futures_bounded::FuturesSet; +use futures_bounded::{Delay, FuturesSet}; use libp2p_core::{ upgrade::{DeniedUpgrade, ReadyUpgrade}, Multiaddr, @@ -64,7 +64,7 @@ where observed_multiaddr, dial_back_cmd_sender, dial_back_cmd_receiver, - inbound: FuturesSet::new(Duration::from_secs(10), 10), + inbound: FuturesSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 10), rng, } } diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index af84d8fe9b1..f4d775fd6ae 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -29,6 +29,7 @@ use std::{ use either::Either; use futures::future; +use futures_bounded::Delay; use libp2p_core::{ multiaddr::Multiaddr, upgrade::{DeniedUpgrade, ReadyUpgrade}, @@ -87,8 +88,14 @@ impl Handler { Self { endpoint, queued_events: Default::default(), - inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), - outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1), + inbound_stream: futures_bounded::FuturesSet::new( + || Delay::futures_timer(Duration::from_secs(10)), + 1, + ), + outbound_stream: futures_bounded::FuturesSet::new( + || Delay::futures_timer(Duration::from_secs(10)), + 1, + ), holepunch_candidates, attempts: 0, } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index c2e31ae95f6..478c33d413b 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -139,7 +139,7 @@ impl Handler { remote_peer_id, events: SmallVec::new(), active_streams: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, + || futures_bounded::Delay::futures_timer(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), trigger_next_identify: Delay::new(Duration::ZERO), diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d50f7319af8..2902ca87e03 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.49.1 + +- Enforce an inbound substream timeout in the kad substream handler. + See [PR 6009](https://github.com/libp2p/rust-libp2p/pull/6009). + ## 0.49.0 - Remove no longer constructed GetRecordError::QuorumFailed. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index fd0c8e0eb75..81b8f7db8f9 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition.workspace = true rust-version = { workspace = true } description = "Kademlia protocol for libp2p" -version = "0.49.0" +version = "0.49.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 2c7b6c52257..a95ff589d5d 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -27,7 +27,8 @@ use std::{ }; use either::Either; -use futures::{channel::oneshot, prelude::*, stream::SelectAll}; +use futures::{channel::oneshot, prelude::*}; +use futures_bounded::{Delay, StreamSet}; use libp2p_core::{upgrade, ConnectedPoint}; use libp2p_identity::PeerId; use libp2p_swarm::{ @@ -77,7 +78,8 @@ pub struct Handler { pending_messages: VecDeque<(KadRequestMsg, QueryId)>, /// List of active inbound substreams with the state they are in. - inbound_substreams: SelectAll, + /// The streams are typed `InboundSubstreamState`, but the set uses the item type. + inbound_substreams: StreamSet>, /// The connected endpoint of the connection that the handler /// is associated with. @@ -119,8 +121,6 @@ enum InboundSubstreamState { PendingFlush(UniqueConnecId, KadInStreamSink), /// The substream is being closed. Closing(KadInStreamSink), - /// The substream was cancelled in favor of a new one. - Cancelled, Poisoned { phantom: PhantomData, @@ -173,9 +173,6 @@ impl InboundSubstreamState { | InboundSubstreamState::Closing(substream) => { *self = InboundSubstreamState::Closing(substream); } - InboundSubstreamState::Cancelled => { - *self = InboundSubstreamState::Cancelled; - } InboundSubstreamState::Poisoned { .. } => unreachable!(), } } @@ -461,9 +458,12 @@ impl Handler { endpoint, remote_peer_id, next_connec_unique_id: UniqueConnecId(0), - inbound_substreams: Default::default(), + inbound_substreams: StreamSet::new( + move || Delay::futures_timer(substreams_timeout), + MAX_NUM_STREAMS, + ), outbound_substreams: futures_bounded::FuturesTupleSet::new( - substreams_timeout, + move || Delay::futures_timer(substreams_timeout), MAX_NUM_STREAMS, ), pending_streams: Default::default(), @@ -518,19 +518,31 @@ impl Handler { }); } - if self.inbound_substreams.len() == MAX_NUM_STREAMS { - if let Some(s) = self.inbound_substreams.iter_mut().find(|s| { - matches!( - s, - // An inbound substream waiting to be reused. - InboundSubstreamState::WaitingMessage { first: false, .. } - ) - }) { - *s = InboundSubstreamState::Cancelled; + let connec_unique_id = self.next_connec_unique_id; + self.next_connec_unique_id.0 += 1; + let new_substream = InboundSubstreamState::WaitingMessage { + first: true, + connection_id: connec_unique_id, + substream: protocol, + }; + + if self.inbound_substreams.len() >= MAX_NUM_STREAMS { + if let Some(s) = self + .inbound_substreams + .iter_mut_of_type::() + .find(|s| { + matches!( + **s, + // An inbound substream waiting to be reused. + InboundSubstreamState::WaitingMessage { first: false, .. } + ) + }) + { + *s.get_mut() = new_substream; tracing::debug!( peer=?self.remote_peer_id, "New inbound substream to peer exceeds inbound substream limit. \ - Removed older substream waiting to be reused." + Replacing older substream that was waiting to be reused." ) } else { tracing::warn!( @@ -538,18 +550,13 @@ impl Handler { "New inbound substream to peer exceeds inbound substream limit. \ No older substream waiting to be reused. Dropping new substream." ); - return; } + } else { + self.inbound_substreams + .try_push(new_substream) + .map_err(|_| ()) + .expect("Just checked that stream set is not full; qed"); } - - let connec_unique_id = self.next_connec_unique_id; - self.next_connec_unique_id.0 += 1; - self.inbound_substreams - .push(InboundSubstreamState::WaitingMessage { - first: true, - connection_id: connec_unique_id, - substream: protocol, - }); } /// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol @@ -616,15 +623,15 @@ impl ConnectionHandler for Handler { HandlerIn::Reset(request_id) => { if let Some(state) = self .inbound_substreams - .iter_mut() - .find(|state| match state { + .iter_mut_of_type::() + .find(|state| match **state { InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => { - conn_id == &request_id.connec_unique_id + conn_id == request_id.connec_unique_id } _ => false, }) { - state.close(); + state.get_mut().close(); } } HandlerIn::FindNodeReq { key, query_id } => { @@ -763,8 +770,16 @@ impl ConnectionHandler for Handler { Poll::Pending => {} } - if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) { - return Poll::Ready(event); + if let Poll::Ready(Some(event_result)) = self.inbound_substreams.poll_next_unpin(cx) { + match event_result { + Ok(event) => return Poll::Ready(event), + Err(_stream_set_timeout) => { + tracing::trace!( + "Inbound substream timed out waiting for peer, send, or close" + ); + continue; + } + } } if self.outbound_substreams.len() < MAX_NUM_STREAMS { @@ -848,8 +863,11 @@ fn compute_new_protocol_status( impl Handler { fn answer_pending_request(&mut self, request_id: RequestId, mut msg: KadResponseMsg) { - for state in self.inbound_substreams.iter_mut() { - match state.try_answer_with(request_id, msg) { + for state in self + .inbound_substreams + .iter_mut_of_type::() + { + match state.get_mut().try_answer_with(request_id, msg) { Ok(()) => return, Err(m) => { msg = m; @@ -1006,7 +1024,6 @@ impl futures::Stream for InboundSubstreamState { } }, InboundSubstreamState::Poisoned { .. } => unreachable!(), - InboundSubstreamState::Cancelled => return Poll::Ready(None), } } } diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs index 2748a5b6ad9..0d0b7dc8e07 100644 --- a/protocols/perf/src/server/handler.rs +++ b/protocols/perf/src/server/handler.rs @@ -24,6 +24,7 @@ use std::{ }; use futures::FutureExt; +use futures_bounded::Delay; use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; use libp2p_swarm::{ handler::{ @@ -49,7 +50,7 @@ impl Handler { pub fn new() -> Self { Self { inbound: futures_bounded::FuturesSet::new( - crate::RUN_TIMEOUT, + || Delay::futures_timer(crate::RUN_TIMEOUT), crate::MAX_PARALLEL_RUNS_PER_CONNECTION, ), } diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index af130c35516..3a9982cd776 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -391,11 +391,11 @@ impl Handler { pub fn new(config: Config, endpoint: ConnectedPoint) -> Handler { Handler { inbound_workers: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, + || futures_bounded::Delay::futures_timer(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), outbound_workers: futures_bounded::FuturesMap::new( - STREAM_TIMEOUT, + || futures_bounded::Delay::futures_timer(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), endpoint, diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 5f46dbf4460..fe5c6dd542b 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -142,19 +142,19 @@ impl Handler { queued_events: Default::default(), pending_streams: Default::default(), inflight_reserve_requests: futures_bounded::FuturesTupleSet::new( - STREAM_TIMEOUT, + || futures_bounded::Delay::futures_timer(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new( - STREAM_TIMEOUT, + || futures_bounded::Delay::futures_timer(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new( - STREAM_TIMEOUT, + || futures_bounded::Delay::futures_timer(STREAM_TIMEOUT), MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new( - DENYING_CIRCUIT_TIMEOUT, + || futures_bounded::Delay::futures_timer(DENYING_CIRCUIT_TIMEOUT), MAX_NUMBER_DENYING_CIRCUIT, ), reservation: Reservation::None, diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index abc994f160e..ba2c2271411 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -35,6 +35,7 @@ use futures::{ channel::{mpsc, oneshot}, prelude::*, }; +use futures_bounded::Delay; use libp2p_swarm::{ handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, @@ -111,7 +112,7 @@ where pending_events: VecDeque::new(), inbound_request_id, worker_streams: futures_bounded::FuturesMap::new( - substream_timeout, + move || Delay::futures_timer(substream_timeout), max_concurrent_streams, ), }