From 3927ca22c7250e454a0583376d7ad57543e74dca Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 14 Aug 2022 13:37:56 +0900 Subject: [PATCH 01/11] *: Generic connection management --- Cargo.toml | 5 + swarm/src/behaviour.rs | 33 +++- swarm/src/connection.rs | 21 +-- swarm/src/connection/error.rs | 12 +- swarm/src/connection/pool.rs | 306 +++++----------------------------- swarm/src/lib.rs | 274 +++++------------------------- 6 files changed, 122 insertions(+), 529 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 75867ddbf4f..85645998f90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,8 @@ categories = ["network-programming", "asynchronous"] [features] default = [ "autonat", + # TODO: Should this really be a default? + "connection-limit", "deflate", "dns-async-std", "floodsub", @@ -37,6 +39,7 @@ default = [ ] autonat = ["dep:libp2p-autonat"] +connection-limit = ["dep:libp2p-connection-limit"] dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"] deflate = ["dep:libp2p-deflate"] dns-async-std = ["dep:libp2p-dns", "libp2p-dns?/async-std"] @@ -79,6 +82,7 @@ lazy_static = "1.2" libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true } libp2p-core = { version = "0.35.0", path = "core", default-features = false } +libp2p-connection-limit = { version = "0.1.0", path = "protocols/connection-limit", optional = true } libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true } libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true } @@ -130,6 +134,7 @@ members = [ "misc/prost-codec", "muxers/mplex", "muxers/yamux", + "protocols/connection-limit", "protocols/dcutr", "protocols/autonat", "protocols/floodsub", diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index d6802c086a8..ff2f5ee51d1 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -24,10 +24,11 @@ pub mod toggle; use crate::dial_opts::DialOpts; use crate::handler::{ConnectionHandler, IntoConnectionHandler}; use crate::{AddressRecord, AddressScore, DialError}; -use libp2p_core::{ - connection::ConnectionId, transport::ListenerId, ConnectedPoint, Multiaddr, PeerId, -}; -use std::{task::Context, task::Poll}; +use libp2p_core::connection::{ConnectionId, Endpoint}; +use libp2p_core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId}; +use std::error::Error; +use std::task::Context; +use std::task::Poll; /// Custom event that can be received by the [`ConnectionHandler`]. pub(crate) type THandlerInEvent = @@ -191,6 +192,24 @@ pub trait NetworkBehaviour: 'static { vec![] } + fn review_pending_connection( + &mut self, + _peer_id: Option, + // TODO: Maybe an iterator is better? + _addresses: &[Multiaddr], + _endpoint: Endpoint, + ) -> Result<(), ReviewDenied> { + Ok(()) + } + + fn inject_connection_pending( + &mut self, + _peer_id: Option, + _connection_id: ConnectionId, + _endpoint: Endpoint, + ) { + } + /// Informs the behaviour about a newly established connection to a peer. fn inject_connection_established( &mut self, @@ -771,3 +790,9 @@ impl Default for CloseConnection { CloseConnection::All } } + +// TODO: Needed in the first place? Are there some common errors that we want? +#[derive(Debug)] +pub enum ReviewDenied { + Error(Box), +} diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f92186618ae..cc357ab26b3 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -27,7 +27,6 @@ pub use error::{ ConnectionError, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; -pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; @@ -40,7 +39,7 @@ use libp2p_core::upgrade; use libp2p_core::PeerId; use std::collections::VecDeque; use std::future::Future; -use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; +use std::{fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -199,21 +198,3 @@ impl<'a> IncomingInfo<'a> { } } } - -/// Information about a connection limit. -#[derive(Debug, Clone)] -pub struct ConnectionLimit { - /// The maximum number of connections. - pub limit: u32, - /// The current number of connections. - pub current: u32, -} - -impl fmt::Display for ConnectionLimit { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}/{}", self.current, self.limit) - } -} - -/// A `ConnectionLimit` can represent an error if it has been exceeded. -impl Error for ConnectionLimit {} diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 8a6d6bbbf00..962f213051d 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -21,7 +21,7 @@ use super::handler_wrapper; use crate::transport::TransportError; use crate::Multiaddr; -use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; +use crate::{ConnectedPoint, PeerId}; use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. @@ -99,9 +99,10 @@ pub enum PendingConnectionError { /// An error occurred while negotiating the transport protocol(s) on a connection. Transport(TTransErr), + // TODO: Still needed? /// The connection was dropped because the connection limit /// for a peer has been reached. - ConnectionLimit(ConnectionLimit), + // ConnectionLimit(ConnectionLimit), /// Pending connection attempt has been aborted. Aborted, @@ -122,9 +123,6 @@ impl PendingConnectionError { pub fn map(self, f: impl FnOnce(T) -> U) -> PendingConnectionError { match self { PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)), - PendingConnectionError::ConnectionLimit(l) => { - PendingConnectionError::ConnectionLimit(l) - } PendingConnectionError::Aborted => PendingConnectionError::Aborted, PendingConnectionError::WrongPeerId { obtained, endpoint } => { PendingConnectionError::WrongPeerId { obtained, endpoint } @@ -149,9 +147,6 @@ where err ) } - PendingConnectionError::ConnectionLimit(l) => { - write!(f, "Connection error: Connection limit: {}.", l) - } PendingConnectionError::WrongPeerId { obtained, endpoint } => { write!( f, @@ -173,7 +168,6 @@ where PendingConnectionError::Transport(_) => None, PendingConnectionError::WrongPeerId { .. } => None, PendingConnectionError::Aborted => None, - PendingConnectionError::ConnectionLimit(..) => None, } } } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 62e931e9510..2b524e34ec3 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -22,7 +22,7 @@ use crate::{ behaviour::{THandlerInEvent, THandlerOutEvent}, connection::{ - Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, + Connected, ConnectionError, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, transport::{Transport, TransportError}, @@ -41,7 +41,6 @@ use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use std::{ collections::{hash_map, HashMap}, - convert::TryFrom as _, fmt, num::{NonZeroU8, NonZeroUsize}, pin::Pin, @@ -60,9 +59,6 @@ where { local_id: PeerId, - /// The connection counter(s). - counters: ConnectionCounters, - /// The managed connections of each peer that are currently considered established. established: FnvHashMap< PeerId, @@ -153,9 +149,7 @@ struct PendingConnectionInfo { impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Pool") - .field("counters", &self.counters) - .finish() + f.debug_struct("Pool").finish() } } @@ -254,14 +248,13 @@ where TTrans: Transport, { /// Creates a new empty `Pool`. - pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { + pub fn new(local_id: PeerId, config: PoolConfig) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); let (established_connection_events_tx, established_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); Pool { local_id, - counters: ConnectionCounters::new(limits), established: Default::default(), pending: Default::default(), next_connection_id: ConnectionId::new(0), @@ -278,11 +271,6 @@ where } } - /// Gets the dedicated connection counters. - pub fn counters(&self) -> &ConnectionCounters { - &self.counters - } - /// Gets an entry representing a connection in the pool. /// /// Returns `None` if the pool has no connection with the given ID. @@ -433,14 +421,15 @@ where handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result + ) -> ConnectionId where TTrans: Send, TTrans::Dial: Send + 'static, { - if let Err(limit) = self.counters.check_max_pending_outgoing() { - return Err((limit, handler)); - }; + // TODO + // if let Err(limit) = self.counters.check_max_pending_outgoing() { + // return Err((limit, handler)); + // }; let dial = ConcurrentDial::new( dials, @@ -463,7 +452,8 @@ where let endpoint = PendingPoint::Dialer { role_override }; - self.counters.inc_pending(&endpoint); + // TODO + // self.counters.inc_pending(&endpoint); self.pending.insert( connection_id, PendingConnectionInfo { @@ -473,7 +463,7 @@ where abort_notifier: Some(abort_notifier), }, ); - Ok(connection_id) + connection_id } /// Adds a pending incoming connection to the pool in the form of a @@ -486,15 +476,16 @@ where future: TFut, handler: THandler, info: IncomingInfo<'_>, - ) -> Result + ) -> ConnectionId where TFut: Future> + Send + 'static, { let endpoint = info.create_connected_point(); - if let Err(limit) = self.counters.check_max_pending_incoming() { - return Err((limit, handler)); - } + // TODO + // if let Err(limit) = self.counters.check_max_pending_incoming() { + // return Err((limit, handler)); + // } let connection_id = self.next_connection_id(); @@ -510,7 +501,8 @@ where .boxed(), ); - self.counters.inc_pending_incoming(); + // TODO + // self.counters.inc_pending_incoming(); self.pending.insert( connection_id, PendingConnectionInfo { @@ -520,7 +512,7 @@ where abort_notifier: Some(abort_notifier), }, ); - Ok(connection_id) + connection_id } /// Polls the connection pool for events. @@ -577,7 +569,8 @@ where .expect("`Closed` event for established connection"); let EstablishedConnectionInfo { endpoint, .. } = connections.remove(&id).expect("Connection to be present"); - self.counters.dec_established(&endpoint); + // TODO + // self.counters.dec_established(&endpoint); let remaining_established_connection_ids: Vec = connections.keys().cloned().collect(); if remaining_established_connection_ids.is_empty() { @@ -617,7 +610,8 @@ where .remove(&id) .expect("Entry in `self.pending` for previously pending connection."); - self.counters.dec_pending(&endpoint); + // TODO + // self.counters.dec_pending(&endpoint); let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) { (PendingPoint::Dialer { role_override }, Some((address, errors))) => ( @@ -648,20 +642,23 @@ where ), }; - let error: Result<(), PendingInboundConnectionError<_>> = self - .counters - // Check general established connection limit. - .check_max_established(&endpoint) - .map_err(PendingConnectionError::ConnectionLimit) + let error: Result<(), PendingInboundConnectionError<_>> = Ok(()) + // TODO + // self + // .counters + // // Check general established connection limit. + // .check_max_established(&endpoint) + // .map_err(PendingConnectionError::ConnectionLimit) // Check per-peer established connection limit. - .and_then(|()| { - self.counters - .check_max_established_per_peer(num_peer_established( - &self.established, - obtained_peer_id, - )) - .map_err(PendingConnectionError::ConnectionLimit) - }) + // TODO + // .and_then(|()| { + // self.counters + // .check_max_established_per_peer(num_peer_established( + // &self.established, + // obtained_peer_id, + // )) + // .map_err(PendingConnectionError::ConnectionLimit) + // }) // Check expected peer id matches. .and_then(|()| { if let Some(peer) = expected_peer_id { @@ -733,7 +730,8 @@ where // Add the connection to the pool. let conns = self.established.entry(obtained_peer_id).or_default(); let other_established_connection_ids = conns.keys().cloned().collect(); - self.counters.inc_established(&endpoint); + // TODO + // self.counters.inc_established(&endpoint); let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); @@ -786,7 +784,8 @@ where abort_notifier: _, }) = self.pending.remove(&id) { - self.counters.dec_pending(&endpoint); + // TODO + // self.counters.dec_pending(&endpoint); match (endpoint, error) { (PendingPoint::Dialer { .. }, Either::Left(error)) => { @@ -923,225 +922,6 @@ impl EstablishedConnection<'_, TInEvent> { } } -/// Network connection information. -#[derive(Debug, Clone)] -pub struct ConnectionCounters { - /// The effective connection limits. - limits: ConnectionLimits, - /// The current number of incoming connections. - pending_incoming: u32, - /// The current number of outgoing connections. - pending_outgoing: u32, - /// The current number of established inbound connections. - established_incoming: u32, - /// The current number of established outbound connections. - established_outgoing: u32, -} - -impl ConnectionCounters { - fn new(limits: ConnectionLimits) -> Self { - Self { - limits, - pending_incoming: 0, - pending_outgoing: 0, - established_incoming: 0, - established_outgoing: 0, - } - } - - /// The effective connection limits. - pub fn limits(&self) -> &ConnectionLimits { - &self.limits - } - - /// The total number of connections, both pending and established. - pub fn num_connections(&self) -> u32 { - self.num_pending() + self.num_established() - } - - /// The total number of pending connections, both incoming and outgoing. - pub fn num_pending(&self) -> u32 { - self.pending_incoming + self.pending_outgoing - } - - /// The number of incoming connections being established. - pub fn num_pending_incoming(&self) -> u32 { - self.pending_incoming - } - - /// The number of outgoing connections being established. - pub fn num_pending_outgoing(&self) -> u32 { - self.pending_outgoing - } - - /// The number of established incoming connections. - pub fn num_established_incoming(&self) -> u32 { - self.established_incoming - } - - /// The number of established outgoing connections. - pub fn num_established_outgoing(&self) -> u32 { - self.established_outgoing - } - - /// The total number of established connections. - pub fn num_established(&self) -> u32 { - self.established_outgoing + self.established_incoming - } - - fn inc_pending(&mut self, endpoint: &PendingPoint) { - match endpoint { - PendingPoint::Dialer { .. } => { - self.pending_outgoing += 1; - } - PendingPoint::Listener { .. } => { - self.pending_incoming += 1; - } - } - } - - fn inc_pending_incoming(&mut self) { - self.pending_incoming += 1; - } - - fn dec_pending(&mut self, endpoint: &PendingPoint) { - match endpoint { - PendingPoint::Dialer { .. } => { - self.pending_outgoing -= 1; - } - PendingPoint::Listener { .. } => { - self.pending_incoming -= 1; - } - } - } - - fn inc_established(&mut self, endpoint: &ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outgoing += 1; - } - ConnectedPoint::Listener { .. } => { - self.established_incoming += 1; - } - } - } - - fn dec_established(&mut self, endpoint: &ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outgoing -= 1; - } - ConnectedPoint::Listener { .. } => { - self.established_incoming -= 1; - } - } - } - - fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { - Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) - } - - fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { - Self::check(self.pending_incoming, self.limits.max_pending_incoming) - } - - fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { - // Check total connection limit. - Self::check(self.num_established(), self.limits.max_established_total)?; - // Check incoming/outgoing connection limits - match endpoint { - ConnectedPoint::Dialer { .. } => Self::check( - self.established_outgoing, - self.limits.max_established_outgoing, - ), - ConnectedPoint::Listener { .. } => Self::check( - self.established_incoming, - self.limits.max_established_incoming, - ), - } - } - - fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { - Self::check(current, self.limits.max_established_per_peer) - } - - fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { - if let Some(limit) = limit { - if current >= limit { - return Err(ConnectionLimit { limit, current }); - } - } - Ok(()) - } -} - -/// Counts the number of established connections to the given peer. -fn num_peer_established( - established: &FnvHashMap>>, - peer: PeerId, -) -> u32 { - established.get(&peer).map_or(0, |conns| { - u32::try_from(conns.len()).expect("Unexpectedly large number of connections for a peer.") - }) -} - -/// The configurable connection limits. -/// -/// By default no connection limits apply. -#[derive(Debug, Clone, Default)] -pub struct ConnectionLimits { - max_pending_incoming: Option, - max_pending_outgoing: Option, - max_established_incoming: Option, - max_established_outgoing: Option, - max_established_per_peer: Option, - max_established_total: Option, -} - -impl ConnectionLimits { - /// Configures the maximum number of concurrently incoming connections being established. - pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { - self.max_pending_incoming = limit; - self - } - - /// Configures the maximum number of concurrently outgoing connections being established. - pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { - self.max_pending_outgoing = limit; - self - } - - /// Configures the maximum number of concurrent established inbound connections. - pub fn with_max_established_incoming(mut self, limit: Option) -> Self { - self.max_established_incoming = limit; - self - } - - /// Configures the maximum number of concurrent established outbound connections. - pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { - self.max_established_outgoing = limit; - self - } - - /// Configures the maximum number of concurrent established connections (both - /// inbound and outbound). - /// - /// Note: This should be used in conjunction with - /// [`ConnectionLimits::with_max_established_incoming`] to prevent possible - /// eclipse attacks (all connections being inbound). - pub fn with_max_established(mut self, limit: Option) -> Self { - self.max_established_total = limit; - self - } - - /// Configures the maximum number of concurrent established connections per peer, - /// regardless of direction (incoming or outgoing). - pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { - self.max_established_per_peer = limit; - self - } -} - /// Configuration options when creating a [`Pool`]. /// /// The default configuration specifies no dedicated task executor, a diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 74de015a858..f80a829d754 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -65,11 +65,11 @@ pub mod handler; pub use behaviour::{ CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, - NotifyHandler, PollParameters, + NotifyHandler, PollParameters, ReviewDenied, }; pub use connection::{ - ConnectionCounters, ConnectionError, ConnectionLimit, ConnectionLimits, PendingConnectionError, - PendingInboundConnectionError, PendingOutboundConnectionError, + ConnectionError, PendingConnectionError, PendingInboundConnectionError, + PendingOutboundConnectionError, }; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, @@ -314,11 +314,7 @@ where /// Returns information about the connections underlying the [`Swarm`]. pub fn network_info(&self) -> NetworkInfo { let num_peers = self.pool.num_peers(); - let connection_counters = self.pool.counters().clone(); - NetworkInfo { - num_peers, - connection_counters, - } + NetworkInfo { num_peers } } /// Starts listening on the given address. @@ -504,6 +500,13 @@ where } }; + self.behaviour.review_pending_connection( + peer_id, + // TODO: Pass the addresses. + &[], + Endpoint::Dialer, + )?; + let dials = addresses .map(|a| match p2p_addr(peer_id, a) { Ok(address) => { @@ -526,20 +529,19 @@ where }) .collect(); - match self.pool.add_outgoing( + let connection_id = self.pool.add_outgoing( dials, peer_id, handler, role_override, dial_concurrency_factor_override, - ) { - Ok(_connection_id) => Ok(()), - Err((connection_limit, handler)) => { - let error = DialError::ConnectionLimit(connection_limit); - self.behaviour.inject_dial_failure(None, handler, &error); - Err(error) - } - } + ); + + self.behaviour + .inject_connection_pending(peer_id, connection_id, Endpoint::Dialer); + // TODO + // self.behaviour.inject_dial_failure(None, handler, &error); + Ok(()) } /// Returns an iterator that produces the list of addresses we're listening on. @@ -843,26 +845,23 @@ where send_back_addr, } => { let handler = self.behaviour.new_handler(); - match self.pool.add_incoming( + self.pool.add_incoming( upgrade, handler, IncomingInfo { local_addr: &local_addr, send_back_addr: &send_back_addr, }, - ) { - Ok(_connection_id) => { - return Some(SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - }); - } - Err((connection_limit, handler)) => { - self.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); - log::warn!("Incoming connection rejected: {:?}", connection_limit); - } - }; + ); + + return Some(SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + }); + + // TODO: Still needed? + // self.behaviour + // .inject_listen_failure(&local_addr, &send_back_addr, handler); } TransportEvent::NewAddress { listener_id, @@ -926,7 +925,6 @@ where return Some(SwarmEvent::ListenerError { listener_id, error }); } } - None } fn handle_behaviour_event( @@ -1267,7 +1265,6 @@ pub struct SwarmBuilder { transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, pool_config: PoolConfig, - connection_limits: ConnectionLimits, } impl SwarmBuilder @@ -1287,7 +1284,6 @@ where transport, behaviour, pool_config: Default::default(), - connection_limits: Default::default(), } } @@ -1348,12 +1344,6 @@ where self } - /// Configures the connection limits. - pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self { - self.connection_limits = limits; - self - } - /// Configures an override for the substream upgrade protocol to use. /// /// The subtream upgrade protocol is the multistream-select protocol @@ -1406,7 +1396,7 @@ where Swarm { local_peer_id: self.local_peer_id, transport: self.transport, - pool: Pool::new(self.local_peer_id, pool_config, self.connection_limits), + pool: Pool::new(self.local_peer_id, pool_config), behaviour: self.behaviour, supported_protocols, listened_addrs: HashMap::new(), @@ -1423,9 +1413,7 @@ where pub enum DialError { /// The peer is currently banned. Banned, - /// The configured limit for simultaneous outgoing connections - /// has been reached. - ConnectionLimit(ConnectionLimit), + ConnectionReviewDenied(ReviewDenied), /// The peer being dialed is the local peer and thus the dial was aborted. LocalPeerId, /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses @@ -1449,10 +1437,15 @@ pub enum DialError { Transport(Vec<(Multiaddr, TransportError)>), } +impl From for DialError { + fn from(denied: ReviewDenied) -> Self { + DialError::ConnectionReviewDenied(denied) + } +} + impl From> for DialError { fn from(error: PendingOutboundConnectionError) -> Self { match error { - PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit), PendingConnectionError::Aborted => DialError::Aborted, PendingConnectionError::WrongPeerId { obtained, endpoint } => { DialError::WrongPeerId { obtained, endpoint } @@ -1466,8 +1459,8 @@ impl From> for DialError { impl fmt::Display for DialError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), + DialError::ConnectionReviewDenied(_)=> todo!(), DialError::LocalPeerId => write!(f, "Dial error: tried to dial local peer id."), DialError::Banned => write!(f, "Dial error: peer is banned."), DialError::DialPeerConditionFalse(c) => { @@ -1495,8 +1488,8 @@ impl fmt::Display for DialError { impl error::Error for DialError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - DialError::ConnectionLimit(err) => Some(err), DialError::LocalPeerId => None, + DialError::ConnectionReviewDenied(_) => None, DialError::NoAddresses => None, DialError::Banned => None, DialError::DialPeerConditionFalse(_) => None, @@ -1561,13 +1554,12 @@ impl NetworkBehaviour for DummyBehaviour { } } +// TODO: Struct still needed now that connection limit is gone? /// Information about the connections obtained by [`Swarm::network_info()`]. #[derive(Clone, Debug)] pub struct NetworkInfo { /// The total number of connected peers. num_peers: usize, - /// Counters of ongoing network connections. - connection_counters: ConnectionCounters, } impl NetworkInfo { @@ -1576,11 +1568,6 @@ impl NetworkInfo { pub fn num_peers(&self) -> usize { self.num_peers } - - /// Gets counters for ongoing network connections. - pub fn connection_counters(&self) -> &ConnectionCounters { - &self.connection_counters - } } /// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer. @@ -2119,185 +2106,6 @@ mod tests { QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _); } - #[test] - fn max_outgoing() { - use rand::Rng; - - let outgoing_limit = rand::thread_rng().gen_range(1, 10); - - let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)); - let mut network = new_test_swarm::<_, ()>(DummyConnectionHandler { - keep_alive: KeepAlive::Yes, - }) - .connection_limits(limits) - .build(); - - let addr: Multiaddr = "/memory/1234".parse().unwrap(); - - let target = PeerId::random(); - for _ in 0..outgoing_limit { - network - .dial( - DialOpts::peer_id(target) - .addresses(vec![addr.clone()]) - .build(), - ) - .ok() - .expect("Unexpected connection limit."); - } - - match network - .dial( - DialOpts::peer_id(target) - .addresses(vec![addr.clone()]) - .build(), - ) - .expect_err("Unexpected dialing success.") - { - DialError::ConnectionLimit(limit) => { - assert_eq!(limit.current, outgoing_limit); - assert_eq!(limit.limit, outgoing_limit); - } - e => panic!("Unexpected error: {:?}", e), - } - - let info = network.network_info(); - assert_eq!(info.num_peers(), 0); - assert_eq!( - info.connection_counters().num_pending_outgoing(), - outgoing_limit - ); - } - - #[test] - fn max_established_incoming() { - use rand::Rng; - - #[derive(Debug, Clone)] - struct Limit(u32); - - impl Arbitrary for Limit { - fn arbitrary(g: &mut G) -> Self { - Self(g.gen_range(1, 10)) - } - } - - fn limits(limit: u32) -> ConnectionLimits { - ConnectionLimits::default().with_max_established_incoming(Some(limit)) - } - - fn prop(limit: Limit) { - let limit = limit.0; - - let mut network1 = new_test_swarm::<_, ()>(DummyConnectionHandler { - keep_alive: KeepAlive::Yes, - }) - .connection_limits(limits(limit)) - .build(); - let mut network2 = new_test_swarm::<_, ()>(DummyConnectionHandler { - keep_alive: KeepAlive::Yes, - }) - .connection_limits(limits(limit)) - .build(); - - let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap(); - let listen_addr = async_std::task::block_on(poll_fn(|cx| { - match ready!(network1.poll_next_unpin(cx)).unwrap() { - SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address), - e => panic!("Unexpected network event: {:?}", e), - } - })); - - // Spawn and block on the dialer. - async_std::task::block_on({ - let mut n = 0; - let _ = network2.dial(listen_addr.clone()).unwrap(); - - let mut expected_closed = false; - let mut network_1_established = false; - let mut network_2_established = false; - let mut network_1_limit_reached = false; - let mut network_2_limit_reached = false; - poll_fn(move |cx| { - loop { - let mut network_1_pending = false; - let mut network_2_pending = false; - - match network1.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {} - Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { - network_1_established = true; - } - Poll::Ready(Some(SwarmEvent::IncomingConnectionError { - error: PendingConnectionError::ConnectionLimit(err), - .. - })) => { - assert_eq!(err.limit, limit); - assert_eq!(err.limit, err.current); - let info = network1.network_info(); - let counters = info.connection_counters(); - assert_eq!(counters.num_established_incoming(), limit); - assert_eq!(counters.num_established(), limit); - network_1_limit_reached = true; - } - Poll::Pending => { - network_1_pending = true; - } - e => panic!("Unexpected network event: {:?}", e), - } - - match network2.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { - network_2_established = true; - } - Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => { - assert!(expected_closed); - let info = network2.network_info(); - let counters = info.connection_counters(); - assert_eq!(counters.num_established_outgoing(), limit); - assert_eq!(counters.num_established(), limit); - network_2_limit_reached = true; - } - Poll::Pending => { - network_2_pending = true; - } - e => panic!("Unexpected network event: {:?}", e), - } - - if network_1_pending && network_2_pending { - return Poll::Pending; - } - - if network_1_established && network_2_established { - network_1_established = false; - network_2_established = false; - - if n <= limit { - // Dial again until the limit is exceeded. - n += 1; - network2.dial(listen_addr.clone()).unwrap(); - - if n == limit { - // The the next dialing attempt exceeds the limit, this - // is the connection we expected to get closed. - expected_closed = true; - } - } else { - panic!("Expect networks not to establish connections beyond the limit.") - } - } - - if network_1_limit_reached && network_2_limit_reached { - return Poll::Ready(()); - } - } - }) - }); - } - - quickcheck(prop as fn(_)); - } - #[test] fn invalid_peer_id() { // Checks whether dialing an address containing the wrong peer id raises an error From 7bfdba656a13c9c7c8761ccbe07bc54923bcd4ef Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 14 Aug 2022 13:38:27 +0900 Subject: [PATCH 02/11] protocols/connection-limit: Introduce --- protocols/connection-limit/Cargo.toml | 22 ++ protocols/connection-limit/src/lib.rs | 326 ++++++++++++++++++++++++ protocols/connection-limit/tests/lib.rs | 235 +++++++++++++++++ 3 files changed, 583 insertions(+) create mode 100644 protocols/connection-limit/Cargo.toml create mode 100644 protocols/connection-limit/src/lib.rs create mode 100644 protocols/connection-limit/tests/lib.rs diff --git a/protocols/connection-limit/Cargo.toml b/protocols/connection-limit/Cargo.toml new file mode 100644 index 00000000000..0c90d094c8c --- /dev/null +++ b/protocols/connection-limit/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "libp2p-connection-limit" +edition = "2021" +rust-version = "1.56.1" +description = "Basic connection limiting functionality" +version = "0.1.0" +authors = ["Max Inden "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.38.0", path = "../../swarm" } +# TODO: Still needed? +fnv = "1.0" + +[dev-dependencies] +quickcheck = "0.9.0" +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-yamux = { path = "../../muxers/yamux" } \ No newline at end of file diff --git a/protocols/connection-limit/src/lib.rs b/protocols/connection-limit/src/lib.rs new file mode 100644 index 00000000000..ac073386ab0 --- /dev/null +++ b/protocols/connection-limit/src/lib.rs @@ -0,0 +1,326 @@ +// Copyright 2021 Protocol Labs. +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::connection::{ConnectedPoint, ConnectionId, Endpoint, PendingPoint}; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::handler::{ConnectionHandler, DummyConnectionHandler, IntoConnectionHandler}; +use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ReviewDenied}; +use std::error::Error; +use std::fmt; +use std::task::{Context, Poll}; + +pub struct Behaviour { + counters: ConnectionCounters, +} + +impl Behaviour { + pub fn new(limits: ConnectionLimits) -> Self { + Behaviour { + counters: ConnectionCounters::new(limits), + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = DummyConnectionHandler; + type OutEvent = (); + + fn new_handler(&mut self) -> Self::ConnectionHandler { + Default::default() + } + + fn review_pending_connection( + &mut self, + _peer_id: Option, + // TODO: Maybe an iterator is better? + _addresses: &[Multiaddr], + endpoint: Endpoint, + ) -> Result<(), ReviewDenied> { + match endpoint { + Endpoint::Dialer => self.counters.check_max_pending_outgoing(), + Endpoint::Listener => self.counters.check_max_pending_incoming(), + } + .map_err(|e| ReviewDenied::Error(e.into())) + } + + fn inject_connection_pending( + &mut self, + _peer_id: Option, + _connection_id: ConnectionId, + endpoint: Endpoint, + ) { + self.counters.inc_pending(endpoint) + } + + fn inject_event( + &mut self, + _peer_id: PeerId, + _connection: ConnectionId, + _event: <::Handler as ConnectionHandler>::OutEvent, + ) { + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + Poll::Pending + } +} + +/// Network connection information. +#[derive(Debug, Clone)] +pub struct ConnectionCounters { + /// The effective connection limits. + limits: ConnectionLimits, + /// The current number of incoming connections. + pending_incoming: u32, + /// The current number of outgoing connections. + pending_outgoing: u32, + /// The current number of established inbound connections. + established_incoming: u32, + /// The current number of established outbound connections. + established_outgoing: u32, +} + +impl ConnectionCounters { + fn new(limits: ConnectionLimits) -> Self { + Self { + limits, + pending_incoming: 0, + pending_outgoing: 0, + established_incoming: 0, + established_outgoing: 0, + } + } + + /// The effective connection limits. + pub fn limits(&self) -> &ConnectionLimits { + &self.limits + } + + /// The total number of connections, both pending and established. + pub fn num_connections(&self) -> u32 { + self.num_pending() + self.num_established() + } + + /// The total number of pending connections, both incoming and outgoing. + pub fn num_pending(&self) -> u32 { + self.pending_incoming + self.pending_outgoing + } + + /// The number of incoming connections being established. + pub fn num_pending_incoming(&self) -> u32 { + self.pending_incoming + } + + /// The number of outgoing connections being established. + pub fn num_pending_outgoing(&self) -> u32 { + self.pending_outgoing + } + + /// The number of established incoming connections. + pub fn num_established_incoming(&self) -> u32 { + self.established_incoming + } + + /// The number of established outgoing connections. + pub fn num_established_outgoing(&self) -> u32 { + self.established_outgoing + } + + /// The total number of established connections. + pub fn num_established(&self) -> u32 { + self.established_outgoing + self.established_incoming + } + + fn inc_pending(&mut self, endpoint: Endpoint) { + match endpoint { + Endpoint::Dialer => { + self.pending_outgoing += 1; + } + Endpoint::Listener => { + self.pending_incoming += 1; + } + } + } + + fn inc_pending_incoming(&mut self) { + self.pending_incoming += 1; + } + + fn dec_pending(&mut self, endpoint: &PendingPoint) { + match endpoint { + PendingPoint::Dialer { .. } => { + self.pending_outgoing -= 1; + } + PendingPoint::Listener { .. } => { + self.pending_incoming -= 1; + } + } + } + + fn inc_established(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { + self.established_outgoing += 1; + } + ConnectedPoint::Listener { .. } => { + self.established_incoming += 1; + } + } + } + + fn dec_established(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { + self.established_outgoing -= 1; + } + ConnectedPoint::Listener { .. } => { + self.established_incoming -= 1; + } + } + } + + fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { + Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) + } + + fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { + Self::check(self.pending_incoming, self.limits.max_pending_incoming) + } + + fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { + // Check total connection limit. + Self::check(self.num_established(), self.limits.max_established_total)?; + // Check incoming/outgoing connection limits + match endpoint { + ConnectedPoint::Dialer { .. } => Self::check( + self.established_outgoing, + self.limits.max_established_outgoing, + ), + ConnectedPoint::Listener { .. } => Self::check( + self.established_incoming, + self.limits.max_established_incoming, + ), + } + } + + fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { + Self::check(current, self.limits.max_established_per_peer) + } + + fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { + if let Some(limit) = limit { + if current >= limit { + return Err(ConnectionLimit { limit, current }); + } + } + Ok(()) + } +} + +// /// Counts the number of established connections to the given peer. +// fn num_peer_established( +// // TODO: Use normal HashMap. +// established: &FnvHashMap>>, +// peer: PeerId, +// ) -> u32 { +// established.get(&peer).map_or(0, |conns| { +// u32::try_from(conns.len()).expect("Unexpectedly large number of connections for a peer.") +// }) +// } + +/// The configurable connection limits. +/// +/// By default no connection limits apply. +#[derive(Debug, Clone, Default)] +pub struct ConnectionLimits { + max_pending_incoming: Option, + max_pending_outgoing: Option, + max_established_incoming: Option, + max_established_outgoing: Option, + max_established_per_peer: Option, + max_established_total: Option, +} + +impl ConnectionLimits { + /// Configures the maximum number of concurrently incoming connections being established. + pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { + self.max_pending_incoming = limit; + self + } + + /// Configures the maximum number of concurrently outgoing connections being established. + pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { + self.max_pending_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established inbound connections. + pub fn with_max_established_incoming(mut self, limit: Option) -> Self { + self.max_established_incoming = limit; + self + } + + /// Configures the maximum number of concurrent established outbound connections. + pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { + self.max_established_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established connections (both + /// inbound and outbound). + /// + /// Note: This should be used in conjunction with + /// [`ConnectionLimits::with_max_established_incoming`] to prevent possible + /// eclipse attacks (all connections being inbound). + pub fn with_max_established(mut self, limit: Option) -> Self { + self.max_established_total = limit; + self + } + + /// Configures the maximum number of concurrent established connections per peer, + /// regardless of direction (incoming or outgoing). + pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { + self.max_established_per_peer = limit; + self + } +} + +/// Information about a connection limit. +#[derive(Debug, Clone)] +pub struct ConnectionLimit { + /// The maximum number of connections. + pub limit: u32, + /// The current number of connections. + pub current: u32, +} + +impl fmt::Display for ConnectionLimit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.current, self.limit) + } +} + +/// A `ConnectionLimit` can represent an error if it has been exceeded. +impl Error for ConnectionLimit {} diff --git a/protocols/connection-limit/tests/lib.rs b/protocols/connection-limit/tests/lib.rs new file mode 100644 index 00000000000..97c88185f16 --- /dev/null +++ b/protocols/connection-limit/tests/lib.rs @@ -0,0 +1,235 @@ +use libp2p_connection_limit::{Behaviour, ConnectionLimits}; +use libp2p_core::connection::{ConnectionId, Endpoint}; +use libp2p_core::identity; +use libp2p_core::transport::{MemoryTransport, Transport}; +use libp2p_core::upgrade; +use libp2p_core::Multiaddr; +use libp2p_core::PeerId; +use libp2p_plaintext::PlainText2Config; +use libp2p_swarm::behaviour::NetworkBehaviour; +use libp2p_swarm::dial_opts::DialOpts; +use libp2p_swarm::SwarmBuilder; +use libp2p_yamux::YamuxConfig; +use quickcheck::QuickCheck; + +#[test] +fn enforces_pending_connection_limit() { + fn prop(outbound: u8) { + let limit = ConnectionLimits::default().with_max_pending_outgoing(Some(outbound.into())); + + let behaviour = Behaviour::new(limit); + let id_keys = identity::Keypair::generate_ed25519(); + let local_public_key = id_keys.public(); + let transport = MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(YamuxConfig::default()) + .boxed(); + let mut swarm = SwarmBuilder::new(transport, behaviour, local_public_key.into()).build(); + + let addr: Multiaddr = "/memory/1234".parse().unwrap(); + + for _ in 0..outbound { + swarm + .dial( + DialOpts::peer_id(PeerId::random()) + .addresses(vec![addr.clone()]) + .build(), + ) + .ok() + .expect("Unexpected connection limit."); + } + + assert!(swarm + .dial( + DialOpts::peer_id(PeerId::random()) + .addresses(vec![addr.clone()]) + .build(), + ) + .is_err()); + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_)); +} + +// TODO: Bring back +// #[test] +// fn max_outgoing() { +// use rand::Rng; +// +// let outgoing_limit = rand::thread_rng().gen_range(1, 10); +// +// let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)); +// let mut network = new_test_swarm::<_, ()>(DummyConnectionHandler { +// keep_alive: KeepAlive::Yes, +// }) +// .connection_limits(limits) +// .build(); +// +// let addr: Multiaddr = "/memory/1234".parse().unwrap(); +// +// let target = PeerId::random(); +// for _ in 0..outgoing_limit { +// network +// .dial( +// DialOpts::peer_id(target) +// .addresses(vec![addr.clone()]) +// .build(), +// ) +// .ok() +// .expect("Unexpected connection limit."); +// } +// +// match network +// .dial( +// DialOpts::peer_id(target) +// .addresses(vec![addr.clone()]) +// .build(), +// ) +// .expect_err("Unexpected dialing success.") +// { +// DialError::ConnectionLimit(limit) => { +// assert_eq!(limit.current, outgoing_limit); +// assert_eq!(limit.limit, outgoing_limit); +// } +// e => panic!("Unexpected error: {:?}", e), +// } +// +// let info = network.network_info(); +// assert_eq!(info.num_peers(), 0); +// assert_eq!( +// info.connection_counters().num_pending_outgoing(), +// outgoing_limit +// ); +// } +// +// #[test] +// fn max_established_incoming() { +// use rand::Rng; +// +// #[derive(Debug, Clone)] +// struct Limit(u32); +// +// impl Arbitrary for Limit { +// fn arbitrary(g: &mut G) -> Self { +// Self(g.gen_range(1, 10)) +// } +// } +// +// fn limits(limit: u32) -> ConnectionLimits { +// ConnectionLimits::default().with_max_established_incoming(Some(limit)) +// } +// +// fn prop(limit: Limit) { +// let limit = limit.0; +// +// let mut network1 = new_test_swarm::<_, ()>(DummyConnectionHandler { +// keep_alive: KeepAlive::Yes, +// }) +// .connection_limits(limits(limit)) +// .build(); +// let mut network2 = new_test_swarm::<_, ()>(DummyConnectionHandler { +// keep_alive: KeepAlive::Yes, +// }) +// .connection_limits(limits(limit)) +// .build(); +// +// let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap(); +// let listen_addr = async_std::task::block_on(poll_fn(|cx| { +// match ready!(network1.poll_next_unpin(cx)).unwrap() { +// SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address), +// e => panic!("Unexpected network event: {:?}", e), +// } +// })); +// +// // Spawn and block on the dialer. +// async_std::task::block_on({ +// let mut n = 0; +// let _ = network2.dial(listen_addr.clone()).unwrap(); +// +// let mut expected_closed = false; +// let mut network_1_established = false; +// let mut network_2_established = false; +// let mut network_1_limit_reached = false; +// let mut network_2_limit_reached = false; +// poll_fn(move |cx| { +// loop { +// let mut network_1_pending = false; +// let mut network_2_pending = false; +// +// match network1.poll_next_unpin(cx) { +// Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {} +// Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { +// network_1_established = true; +// } +// Poll::Ready(Some(SwarmEvent::IncomingConnectionError { +// error: PendingConnectionError::ConnectionLimit(err), +// .. +// })) => { +// assert_eq!(err.limit, limit); +// assert_eq!(err.limit, err.current); +// let info = network1.network_info(); +// let counters = info.connection_counters(); +// assert_eq!(counters.num_established_incoming(), limit); +// assert_eq!(counters.num_established(), limit); +// network_1_limit_reached = true; +// } +// Poll::Pending => { +// network_1_pending = true; +// } +// e => panic!("Unexpected network event: {:?}", e), +// } +// +// match network2.poll_next_unpin(cx) { +// Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { +// network_2_established = true; +// } +// Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => { +// assert!(expected_closed); +// let info = network2.network_info(); +// let counters = info.connection_counters(); +// assert_eq!(counters.num_established_outgoing(), limit); +// assert_eq!(counters.num_established(), limit); +// network_2_limit_reached = true; +// } +// Poll::Pending => { +// network_2_pending = true; +// } +// e => panic!("Unexpected network event: {:?}", e), +// } +// +// if network_1_pending && network_2_pending { +// return Poll::Pending; +// } +// +// if network_1_established && network_2_established { +// network_1_established = false; +// network_2_established = false; +// +// if n <= limit { +// // Dial again until the limit is exceeded. +// n += 1; +// network2.dial(listen_addr.clone()).unwrap(); +// +// if n == limit { +// // The the next dialing attempt exceeds the limit, this +// // is the connection we expected to get closed. +// expected_closed = true; +// } +// } else { +// panic!("Expect networks not to establish connections beyond the limit.") +// } +// } +// +// if network_1_limit_reached && network_2_limit_reached { +// return Poll::Ready(()); +// } +// } +// }) +// }); +// } +// +// quickcheck(prop as fn(_)); +// } From 835394ca779c4f426549bb3a0dccc3d136b46b70 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 15 Aug 2022 11:43:33 +0900 Subject: [PATCH 03/11] protocols/connection-limit: Test pending inbound connection limit --- protocols/connection-limit/Cargo.toml | 1 + protocols/connection-limit/tests/lib.rs | 70 ++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/protocols/connection-limit/Cargo.toml b/protocols/connection-limit/Cargo.toml index 0c90d094c8c..42cdd45f4de 100644 --- a/protocols/connection-limit/Cargo.toml +++ b/protocols/connection-limit/Cargo.toml @@ -15,6 +15,7 @@ libp2p-core = { version = "0.35.0", path = "../../core", default-features = fals libp2p-swarm = { version = "0.38.0", path = "../../swarm" } # TODO: Still needed? fnv = "1.0" +futures = "0.3.0" [dev-dependencies] quickcheck = "0.9.0" diff --git a/protocols/connection-limit/tests/lib.rs b/protocols/connection-limit/tests/lib.rs index 97c88185f16..6d39bb6f51c 100644 --- a/protocols/connection-limit/tests/lib.rs +++ b/protocols/connection-limit/tests/lib.rs @@ -1,6 +1,14 @@ +// TODO: Should imports go through libp2p crate? +use futures::executor::LocalPool; +use futures::task::LocalSpawn; +use futures::task::LocalSpawnExt; +use futures::task::Spawn; +use futures::FutureExt; +use futures::StreamExt; use libp2p_connection_limit::{Behaviour, ConnectionLimits}; use libp2p_core::connection::{ConnectionId, Endpoint}; use libp2p_core::identity; +use libp2p_core::transport::TransportEvent; use libp2p_core::transport::{MemoryTransport, Transport}; use libp2p_core::upgrade; use libp2p_core::Multiaddr; @@ -9,11 +17,12 @@ use libp2p_plaintext::PlainText2Config; use libp2p_swarm::behaviour::NetworkBehaviour; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::SwarmBuilder; +use libp2p_swarm::SwarmEvent; use libp2p_yamux::YamuxConfig; use quickcheck::QuickCheck; #[test] -fn enforces_pending_connection_limit() { +fn enforces_pending_outbound_connection_limit() { fn prop(outbound: u8) { let limit = ConnectionLimits::default().with_max_pending_outgoing(Some(outbound.into())); @@ -51,7 +60,64 @@ fn enforces_pending_connection_limit() { .is_err()); } - QuickCheck::new().tests(10).quickcheck(prop as fn(_)); + QuickCheck::new().quickcheck(prop as fn(_)); +} + +#[test] +fn enforces_pending_inbound_connection_limit() { + fn prop(inbound: u8) { + let mut pool = LocalPool::default(); + + let mut swarm = { + let local_public_key = identity::Keypair::generate_ed25519().public(); + // TODO: Abstract into method + let transport = MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(YamuxConfig::default()) + .boxed(); + let limit = ConnectionLimits::default().with_max_pending_incoming(Some(inbound.into())); + SwarmBuilder::new(transport, Behaviour::new(limit), local_public_key.into()).build() + }; + + swarm.listen_on("/memory/0".parse().unwrap()).unwrap(); + let listen_addr = match pool.run_until(swarm.next()).unwrap() { + SwarmEvent::NewListenAddr { address, .. } => address, + e => panic!("Unexpected event {:?}", e), + }; + + pool.spawner().spawn_local(async move { + let mut remote_transport = MemoryTransport::default(); + + let dials = (0..inbound + 1) + .map(|_| remote_transport.dial(listen_addr.clone()).unwrap()) + .collect::>(); + + // TODO: What if any of them fails? + futures::future::join( + futures::future::try_join_all(dials), + Transport::boxed(remote_transport).collect::>>(), + ) + .await; + }); + + for i in 0..inbound { + println!("{:?}", i); + match pool.run_until(swarm.next()).unwrap() { + SwarmEvent::IncomingConnection { .. } => {} + e => panic!("Unexpected event {:?}", e), + } + } + + match pool.run_until(swarm.next()).unwrap() { + SwarmEvent::IncomingConnectionError { .. } => {} + e => panic!("Unexpected event {:?}", e), + } + } + + QuickCheck::new().quickcheck(prop as fn(_)); } // TODO: Bring back From 553f28008101686cbd0675edc511b626bc54e658 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 15 Aug 2022 13:11:16 +0900 Subject: [PATCH 04/11] *: Test outbound established limit --- protocols/connection-limit/Cargo.toml | 3 +- protocols/connection-limit/src/lib.rs | 4 - protocols/connection-limit/tests/lib.rs | 308 ++++++++---------------- swarm/src/lib.rs | 23 +- 4 files changed, 120 insertions(+), 218 deletions(-) diff --git a/protocols/connection-limit/Cargo.toml b/protocols/connection-limit/Cargo.toml index 42cdd45f4de..d2a4a59bef1 100644 --- a/protocols/connection-limit/Cargo.toml +++ b/protocols/connection-limit/Cargo.toml @@ -20,4 +20,5 @@ futures = "0.3.0" [dev-dependencies] quickcheck = "0.9.0" libp2p-plaintext = { path = "../../transports/plaintext" } -libp2p-yamux = { path = "../../muxers/yamux" } \ No newline at end of file +libp2p-yamux = { path = "../../muxers/yamux" } +libp2p = { version = "0.47.0", path = "../..", default-features = false } \ No newline at end of file diff --git a/protocols/connection-limit/src/lib.rs b/protocols/connection-limit/src/lib.rs index ac073386ab0..6176a35b9b3 100644 --- a/protocols/connection-limit/src/lib.rs +++ b/protocols/connection-limit/src/lib.rs @@ -164,10 +164,6 @@ impl ConnectionCounters { } } - fn inc_pending_incoming(&mut self) { - self.pending_incoming += 1; - } - fn dec_pending(&mut self, endpoint: &PendingPoint) { match endpoint { PendingPoint::Dialer { .. } => { diff --git a/protocols/connection-limit/tests/lib.rs b/protocols/connection-limit/tests/lib.rs index 6d39bb6f51c..5e31c168028 100644 --- a/protocols/connection-limit/tests/lib.rs +++ b/protocols/connection-limit/tests/lib.rs @@ -1,42 +1,42 @@ // TODO: Should imports go through libp2p crate? use futures::executor::LocalPool; +use futures::future; use futures::task::LocalSpawn; use futures::task::LocalSpawnExt; use futures::task::Spawn; use futures::FutureExt; use futures::StreamExt; -use libp2p_connection_limit::{Behaviour, ConnectionLimits}; +use futures::TryStreamExt; +use libp2p::NetworkBehaviour; +use libp2p_connection_limit::ConnectionLimits; use libp2p_core::connection::{ConnectionId, Endpoint}; use libp2p_core::identity; +use libp2p_core::identity::PublicKey; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::Boxed; use libp2p_core::transport::TransportEvent; use libp2p_core::transport::{MemoryTransport, Transport}; use libp2p_core::upgrade; use libp2p_core::Multiaddr; use libp2p_core::PeerId; use libp2p_plaintext::PlainText2Config; -use libp2p_swarm::behaviour::NetworkBehaviour; use libp2p_swarm::dial_opts::DialOpts; +use libp2p_swarm::DummyBehaviour; +use libp2p_swarm::KeepAlive; +use libp2p_swarm::Swarm; use libp2p_swarm::SwarmBuilder; use libp2p_swarm::SwarmEvent; use libp2p_yamux::YamuxConfig; use quickcheck::QuickCheck; +// TODO: Test that pending limit is decreased. + #[test] fn enforces_pending_outbound_connection_limit() { fn prop(outbound: u8) { - let limit = ConnectionLimits::default().with_max_pending_outgoing(Some(outbound.into())); + let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outbound.into())); - let behaviour = Behaviour::new(limit); - let id_keys = identity::Keypair::generate_ed25519(); - let local_public_key = id_keys.public(); - let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) - .authenticate(PlainText2Config { - local_public_key: local_public_key.clone(), - }) - .multiplex(YamuxConfig::default()) - .boxed(); - let mut swarm = SwarmBuilder::new(transport, behaviour, local_public_key.into()).build(); + let mut swarm = make_swarm(limits); let addr: Multiaddr = "/memory/1234".parse().unwrap(); @@ -68,19 +68,8 @@ fn enforces_pending_inbound_connection_limit() { fn prop(inbound: u8) { let mut pool = LocalPool::default(); - let mut swarm = { - let local_public_key = identity::Keypair::generate_ed25519().public(); - // TODO: Abstract into method - let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) - .authenticate(PlainText2Config { - local_public_key: local_public_key.clone(), - }) - .multiplex(YamuxConfig::default()) - .boxed(); - let limit = ConnectionLimits::default().with_max_pending_incoming(Some(inbound.into())); - SwarmBuilder::new(transport, Behaviour::new(limit), local_public_key.into()).build() - }; + let limits = ConnectionLimits::default().with_max_pending_incoming(Some(inbound.into())); + let mut swarm = make_swarm(limits); swarm.listen_on("/memory/0".parse().unwrap()).unwrap(); let listen_addr = match pool.run_until(swarm.next()).unwrap() { @@ -95,16 +84,16 @@ fn enforces_pending_inbound_connection_limit() { .map(|_| remote_transport.dial(listen_addr.clone()).unwrap()) .collect::>(); - // TODO: What if any of them fails? - futures::future::join( - futures::future::try_join_all(dials), + future::join( + future::try_join_all(dials), Transport::boxed(remote_transport).collect::>>(), ) - .await; + .await + .0 + .unwrap(); }); for i in 0..inbound { - println!("{:?}", i); match pool.run_until(swarm.next()).unwrap() { SwarmEvent::IncomingConnection { .. } => {} e => panic!("Unexpected event {:?}", e), @@ -112,190 +101,85 @@ fn enforces_pending_inbound_connection_limit() { } match pool.run_until(swarm.next()).unwrap() { - SwarmEvent::IncomingConnectionError { .. } => {} + SwarmEvent::IncomingConnectionDenied => {} + e => panic!("Unexpected event {:?}", e), + } + } + + QuickCheck::new().quickcheck(prop as fn(_)); +} + +#[test] +fn enforces_established_outbound_connection_limit() { + fn prop(outbound: u8) { + let mut pool = LocalPool::default(); + let limits = + ConnectionLimits::default().with_max_established_outgoing(Some(outbound.into())); + + let mut local_swarm = make_swarm(limits); + let mut remote_transport = make_transport(identity::Keypair::generate_ed25519().public()); + + remote_transport + .listen_on("/memory/0".parse().unwrap()) + .unwrap(); + let remote_addr = match pool.run_until(remote_transport.next()).unwrap() { + TransportEvent::NewAddress { listen_addr, .. } => listen_addr, e => panic!("Unexpected event {:?}", e), + }; + + pool.spawner() + .spawn_local({ + remote_transport + .flat_map(|e| match e { + TransportEvent::Incoming { upgrade, .. } => upgrade.into_stream(), + e => panic!("Unpexted event {:?}", e), + }) + .try_collect::>() + .map(|r| r.map(|_| ()).unwrap()) + }) + .unwrap(); + + for i in 0..outbound { + println!("{:?}", i); + local_swarm + .dial(remote_addr.clone()) + .ok() + .expect("Unexpected connection limit."); + + match pool.run_until(local_swarm.next()).unwrap() { + SwarmEvent::ConnectionEstablished { .. } => {} + e => panic!("Unexpected event {:?}", e), + } } + + assert!(local_swarm.dial(remote_addr).is_err()); } QuickCheck::new().quickcheck(prop as fn(_)); } -// TODO: Bring back -// #[test] -// fn max_outgoing() { -// use rand::Rng; -// -// let outgoing_limit = rand::thread_rng().gen_range(1, 10); -// -// let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)); -// let mut network = new_test_swarm::<_, ()>(DummyConnectionHandler { -// keep_alive: KeepAlive::Yes, -// }) -// .connection_limits(limits) -// .build(); -// -// let addr: Multiaddr = "/memory/1234".parse().unwrap(); -// -// let target = PeerId::random(); -// for _ in 0..outgoing_limit { -// network -// .dial( -// DialOpts::peer_id(target) -// .addresses(vec![addr.clone()]) -// .build(), -// ) -// .ok() -// .expect("Unexpected connection limit."); -// } -// -// match network -// .dial( -// DialOpts::peer_id(target) -// .addresses(vec![addr.clone()]) -// .build(), -// ) -// .expect_err("Unexpected dialing success.") -// { -// DialError::ConnectionLimit(limit) => { -// assert_eq!(limit.current, outgoing_limit); -// assert_eq!(limit.limit, outgoing_limit); -// } -// e => panic!("Unexpected error: {:?}", e), -// } -// -// let info = network.network_info(); -// assert_eq!(info.num_peers(), 0); -// assert_eq!( -// info.connection_counters().num_pending_outgoing(), -// outgoing_limit -// ); -// } -// -// #[test] -// fn max_established_incoming() { -// use rand::Rng; -// -// #[derive(Debug, Clone)] -// struct Limit(u32); -// -// impl Arbitrary for Limit { -// fn arbitrary(g: &mut G) -> Self { -// Self(g.gen_range(1, 10)) -// } -// } -// -// fn limits(limit: u32) -> ConnectionLimits { -// ConnectionLimits::default().with_max_established_incoming(Some(limit)) -// } -// -// fn prop(limit: Limit) { -// let limit = limit.0; -// -// let mut network1 = new_test_swarm::<_, ()>(DummyConnectionHandler { -// keep_alive: KeepAlive::Yes, -// }) -// .connection_limits(limits(limit)) -// .build(); -// let mut network2 = new_test_swarm::<_, ()>(DummyConnectionHandler { -// keep_alive: KeepAlive::Yes, -// }) -// .connection_limits(limits(limit)) -// .build(); -// -// let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap(); -// let listen_addr = async_std::task::block_on(poll_fn(|cx| { -// match ready!(network1.poll_next_unpin(cx)).unwrap() { -// SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address), -// e => panic!("Unexpected network event: {:?}", e), -// } -// })); -// -// // Spawn and block on the dialer. -// async_std::task::block_on({ -// let mut n = 0; -// let _ = network2.dial(listen_addr.clone()).unwrap(); -// -// let mut expected_closed = false; -// let mut network_1_established = false; -// let mut network_2_established = false; -// let mut network_1_limit_reached = false; -// let mut network_2_limit_reached = false; -// poll_fn(move |cx| { -// loop { -// let mut network_1_pending = false; -// let mut network_2_pending = false; -// -// match network1.poll_next_unpin(cx) { -// Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {} -// Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { -// network_1_established = true; -// } -// Poll::Ready(Some(SwarmEvent::IncomingConnectionError { -// error: PendingConnectionError::ConnectionLimit(err), -// .. -// })) => { -// assert_eq!(err.limit, limit); -// assert_eq!(err.limit, err.current); -// let info = network1.network_info(); -// let counters = info.connection_counters(); -// assert_eq!(counters.num_established_incoming(), limit); -// assert_eq!(counters.num_established(), limit); -// network_1_limit_reached = true; -// } -// Poll::Pending => { -// network_1_pending = true; -// } -// e => panic!("Unexpected network event: {:?}", e), -// } -// -// match network2.poll_next_unpin(cx) { -// Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { -// network_2_established = true; -// } -// Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => { -// assert!(expected_closed); -// let info = network2.network_info(); -// let counters = info.connection_counters(); -// assert_eq!(counters.num_established_outgoing(), limit); -// assert_eq!(counters.num_established(), limit); -// network_2_limit_reached = true; -// } -// Poll::Pending => { -// network_2_pending = true; -// } -// e => panic!("Unexpected network event: {:?}", e), -// } -// -// if network_1_pending && network_2_pending { -// return Poll::Pending; -// } -// -// if network_1_established && network_2_established { -// network_1_established = false; -// network_2_established = false; -// -// if n <= limit { -// // Dial again until the limit is exceeded. -// n += 1; -// network2.dial(listen_addr.clone()).unwrap(); -// -// if n == limit { -// // The the next dialing attempt exceeds the limit, this -// // is the connection we expected to get closed. -// expected_closed = true; -// } -// } else { -// panic!("Expect networks not to establish connections beyond the limit.") -// } -// } -// -// if network_1_limit_reached && network_2_limit_reached { -// return Poll::Ready(()); -// } -// } -// }) -// }); -// } -// -// quickcheck(prop as fn(_)); -// } +#[derive(NetworkBehaviour)] +struct Behaviour { + limit: libp2p_connection_limit::Behaviour, + keep_alive: DummyBehaviour, +} + +fn make_swarm(limits: ConnectionLimits) -> Swarm { + let behaviour = Behaviour { + limit: libp2p_connection_limit::Behaviour::new(limits), + keep_alive: DummyBehaviour::with_keep_alive(KeepAlive::Yes), + }; + let local_public_key = identity::Keypair::generate_ed25519().public(); + let transport = make_transport(local_public_key.clone()); + SwarmBuilder::new(transport, behaviour, local_public_key.into()).build() +} + +fn make_transport(local_public_key: PublicKey) -> Boxed<(PeerId, StreamMuxerBox)> { + MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(YamuxConfig::default()) + .boxed() +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f80a829d754..e3965752fc5 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -179,6 +179,8 @@ pub enum SwarmEvent { /// Address used to send back data to the remote. send_back_addr: Multiaddr, }, + // TODO: Extend + IncomingConnectionDenied, /// An error happened on a connection during its initial handshake. /// /// This can include, for example, an error during the handshake of the encryption layer, or @@ -844,8 +846,24 @@ where local_addr, send_back_addr, } => { + // TODO: Ideally we would only request the handler afterwards, or maybe even in the + // review_pending_connection call. + let handler = self.behaviour.new_handler(); - self.pool.add_incoming( + + if let Err(err) = self.behaviour.review_pending_connection( + None, + // TODO: It does not make sense to provide a vector here. + &[send_back_addr.clone()], + Endpoint::Listener, + ) { + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + log::warn!("Incoming connection rejected: {:?}", err); + return Some(SwarmEvent::IncomingConnectionDenied); + } + + let connection_id = self.pool.add_incoming( upgrade, handler, IncomingInfo { @@ -854,6 +872,9 @@ where }, ); + self.behaviour + .inject_connection_pending(None, connection_id, Endpoint::Listener); + return Some(SwarmEvent::IncomingConnection { local_addr, send_back_addr, From cb42dd3e494b50acbe255db31d10cffc83ca3023 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 15 Aug 2022 14:48:53 +0900 Subject: [PATCH 05/11] swarm-derive/: Add where clause of behaviour to generated out event When generating the `OutEvent` for a `NetworkBehaviour`, add the `where` clause of the `NetworkBehaviour` `struct` to the generated `enum`. --- swarm-derive/src/lib.rs | 4 +++- swarm-derive/tests/test.rs | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 8a42220246a..2ac9f978657 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -159,7 +159,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let visibility = &ast.vis; Some(quote! { - #visibility enum #name #impl_generics { + #visibility enum #name #impl_generics + #where_clause + { #(#fields),* } }) diff --git a/swarm-derive/tests/test.rs b/swarm-derive/tests/test.rs index 09048e5e801..d23dd8ed863 100644 --- a/swarm-derive/tests/test.rs +++ b/swarm-derive/tests/test.rs @@ -272,7 +272,7 @@ fn custom_event_mismatching_field_names() { } #[test] -fn where_clause() { +fn bound() { #[allow(dead_code)] #[derive(NetworkBehaviour)] struct Foo { @@ -281,6 +281,19 @@ fn where_clause() { } } +#[test] +fn where_clause() { + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + struct Foo + where + T: Copy + NetworkBehaviour, + { + ping: libp2p::ping::Ping, + bar: T, + } +} + #[test] fn nested_derives_with_import() { #[allow(dead_code)] From b4f14723700788d98421f3c2030a975dce051d17 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 16 Aug 2022 15:10:37 +0900 Subject: [PATCH 06/11] swarm-derive/: Derive Debug for generated OutEvent When generating an `OutEvent` `enum` definition for a user, derive `Debug` for that `enum`. Why not derive `Clone`, `PartialEq` and `Eq` for the generated `enum` definition? While I think it is fine to require all sub-`OutEvent`s to implement `Debug`, I don't think the same applies to traits like `Clone`. I suggest users that need `Clone` to define their own `OutEvent`. --- swarm-derive/src/lib.rs | 1 + swarm-derive/tests/test.rs | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 2ac9f978657..51305e38190 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -159,6 +159,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let visibility = &ast.vis; Some(quote! { + #[derive(::std::fmt::Debug)] #visibility enum #name #impl_generics #where_clause { diff --git a/swarm-derive/tests/test.rs b/swarm-derive/tests/test.rs index d23dd8ed863..2cc462a0bcf 100644 --- a/swarm-derive/tests/test.rs +++ b/swarm-derive/tests/test.rs @@ -21,6 +21,7 @@ use futures::prelude::*; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; use libp2p_swarm_derive::*; +use std::fmt::Debug; /// Small utility to check that a type implements `NetworkBehaviour`. #[allow(dead_code)] @@ -489,3 +490,21 @@ fn event_process() { }; } } + +#[test] +fn generated_out_event_derive_debug() { + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + struct Foo { + ping: libp2p::ping::Ping, + } + + fn require_debug() + where + T: NetworkBehaviour, + ::OutEvent: Debug, + { + } + + require_debug::(); +} From 3f58ccd84b6196d84ef914aa8e010267039c5cfb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 16 Aug 2022 17:02:25 +0900 Subject: [PATCH 07/11] swarm-derive/: Do the changes --- protocols/connection-limit/tests/lib.rs | 1 + swarm-derive/src/lib.rs | 32 +++++++++++++++++++++++++ swarm/src/behaviour.rs | 1 + 3 files changed, 34 insertions(+) diff --git a/protocols/connection-limit/tests/lib.rs b/protocols/connection-limit/tests/lib.rs index 5e31c168028..4c4f4006ad2 100644 --- a/protocols/connection-limit/tests/lib.rs +++ b/protocols/connection-limit/tests/lib.rs @@ -23,6 +23,7 @@ use libp2p_plaintext::PlainText2Config; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::DummyBehaviour; use libp2p_swarm::KeepAlive; +use libp2p_swarm::NetworkBehaviour; use libp2p_swarm::Swarm; use libp2p_swarm::SwarmBuilder; use libp2p_swarm::SwarmEvent; diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 51305e38190..d700556d5b1 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -60,6 +60,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let connected_point = quote! {::libp2p::core::ConnectedPoint}; let listener_id = quote! {::libp2p::core::transport::ListenerId}; let dial_error = quote! {::libp2p::swarm::DialError}; + let review_denied = quote! {::libp2p::swarm::ReviewDenied}; let poll_parameters = quote! {::libp2p::swarm::PollParameters}; @@ -222,6 +223,28 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; + // TODO + let review_pending_connection_stmts = { + data_struct_fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => quote! { self.#i.review_pending_connection(peer_id, addresses, endpoint)?; }, + None => quote! { self.#field_n.review_pending_connection(peer_id, addresses, endpoint)?; }, + }) + }; + + // TODO + let inject_connection_pending_stmts = { + data_struct_fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => quote! { self.#i.inject_connection_pending(peer_id, connection_id, endpoint); }, + None => quote! { self.#field_n.inject_connection_pending(peer_id, connection_id, endpoint); }, + }) + }; + // Build the list of statements to put in the body of `inject_connection_established()`. let inject_connection_established_stmts = { data_struct_fields.iter().enumerate().map(move |(field_n, field)| { @@ -621,6 +644,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } + fn review_pending_connection(&mut self, peer_id: Option, addresses: &[Multiaddr], endpoint: Endpoint) -> Result<(), #review_denied> { + #(#review_pending_connection_stmts);* + Ok(()) + } + + fn inject_connection_pending(&mut self, peer_id: Option, connection_id: ConnectionId, endpoint: Endpoint) { + #(#inject_connection_pending_stmts);* + } + fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors, other_established: usize) { #(#inject_connection_established_stmts);* } diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index ff2f5ee51d1..1dd4ed06181 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -192,6 +192,7 @@ pub trait NetworkBehaviour: 'static { vec![] } + // TODO: Make sure this and all the methods below are really implemented across all behaviours. fn review_pending_connection( &mut self, _peer_id: Option, From 0e682ec9fcf5b65dcdbea2b9ad6faad832b79fa4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 16 Aug 2022 17:46:01 +0900 Subject: [PATCH 08/11] *: Review established connections --- swarm-derive/src/lib.rs | 16 +++++++ swarm/src/behaviour.rs | 9 ++++ swarm/src/connection/pool.rs | 89 ++++++++++++++++++++++-------------- swarm/src/lib.rs | 22 ++++++++- 4 files changed, 100 insertions(+), 36 deletions(-) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index d700556d5b1..a9ae1d4c1db 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -234,6 +234,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; + // TODO + let review_established_connection_stmts = { + data_struct_fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => quote! { self.#i.review_established_connection(peer_id, addresses, endpoint)?; }, + None => quote! { self.#field_n.review_established_connection(peer_id, addresses, endpoint)?; }, + }) + }; + // TODO let inject_connection_pending_stmts = { data_struct_fields @@ -649,6 +660,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { Ok(()) } + fn review_established_connection(&mut self, peer_id: Option, addresses: &[Multiaddr], endpoint: Endpoint) -> Result<(), #review_denied> { + #(#review_established_connection_stmts);* + Ok(()) + } + fn inject_connection_pending(&mut self, peer_id: Option, connection_id: ConnectionId, endpoint: Endpoint) { #(#inject_connection_pending_stmts);* } diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 1dd4ed06181..cbfab4a43fa 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -203,6 +203,15 @@ pub trait NetworkBehaviour: 'static { Ok(()) } + fn review_established_connection( + &mut self, + _peer_id: PeerId, + // TODO: Maybe an iterator is better? + _endpoint: &ConnectedPoint, + ) -> Result<(), ReviewDenied> { + Ok(()) + } + fn inject_connection_pending( &mut self, _peer_id: Option, diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 2b524e34ec3..354aeba8a59 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -140,6 +140,7 @@ impl EstablishedConnectionInfo { struct PendingConnectionInfo { /// [`PeerId`] of the remote peer. peer_id: Option, + // TODO: There is no need to have the handler here already. /// Handler to handle connection once no longer pending but established. handler: THandler, endpoint: PendingPoint, @@ -154,7 +155,6 @@ impl fmt::Debug for Pool where TTrans: Transport, @@ -172,6 +172,9 @@ where /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. concurrent_dial_errors: Option)>>, + // TODO: It doesn't make sense to have the handler here already. + handler: THandler, + muxer: StreamMuxerBox, }, /// An established connection was closed. @@ -242,6 +245,12 @@ where }, } +impl fmt::Debug for PoolEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + todo!("due to streammuxerbox") + } +} + impl Pool where THandler: IntoConnectionHandler, @@ -403,7 +412,7 @@ where /// /// Returns an error if the limit of pending outgoing connections /// has been reached. - pub fn add_outgoing( + pub fn add_pending_outgoing( &mut self, dials: Vec< BoxFuture< @@ -471,7 +480,7 @@ where /// /// Returns an error if the limit of pending incoming connections /// has been reached. - pub fn add_incoming( + pub fn add_pending_incoming( &mut self, future: TFut, handler: THandler, @@ -515,6 +524,47 @@ where connection_id } + pub fn add_established( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + muxer: StreamMuxerBox, + handler: THandler, + ) { + // Add the connection to the pool. + let conns = self.established.entry(peer_id).or_default(); + + let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); + conns.insert( + connection_id, + EstablishedConnectionInfo { + peer_id: peer_id, + endpoint: endpoint.clone(), + sender: command_sender, + }, + ); + + let connection = super::Connection::new( + peer_id, + endpoint, + muxer, + handler, + self.substream_upgrade_protocol_override, + self.max_negotiating_inbound_streams, + ); + self.spawn( + task::new_for_established_connection( + connection_id, + peer_id, + connection, + command_receiver, + self.established_connection_events_tx.clone(), + ) + .boxed(), + ); + } + /// Polls the connection pool for events. pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where @@ -727,42 +777,11 @@ where }; } - // Add the connection to the pool. let conns = self.established.entry(obtained_peer_id).or_default(); let other_established_connection_ids = conns.keys().cloned().collect(); // TODO // self.counters.inc_established(&endpoint); - let (command_sender, command_receiver) = - mpsc::channel(self.task_command_buffer_size); - conns.insert( - id, - EstablishedConnectionInfo { - peer_id: obtained_peer_id, - endpoint: endpoint.clone(), - sender: command_sender, - }, - ); - - let connection = super::Connection::new( - obtained_peer_id, - endpoint, - muxer, - handler, - self.substream_upgrade_protocol_override, - self.max_negotiating_inbound_streams, - ); - self.spawn( - task::new_for_established_connection( - id, - obtained_peer_id, - connection, - command_receiver, - self.established_connection_events_tx.clone(), - ) - .boxed(), - ); - match self.get(id) { Some(PoolConnection::Established(connection)) => { return Poll::Ready(PoolEvent::ConnectionEstablished { @@ -771,6 +790,8 @@ where id: connection.id(), other_established_connection_ids, concurrent_dial_errors, + handler, + muxer, }) } _ => unreachable!("since `entry` is an `EstablishedEntry`."), diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index e3965752fc5..dd9bbf326ed 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -283,6 +283,7 @@ where /// similar mechanisms. external_addrs: Addresses, + // TODO: I think this should move into the connection manager behaviour as well. /// List of nodes for which we deny any incoming connection. banned_peers: HashSet, @@ -531,7 +532,7 @@ where }) .collect(); - let connection_id = self.pool.add_outgoing( + let connection_id = self.pool.add_pending_outgoing( dials, peer_id, handler, @@ -679,13 +680,18 @@ where endpoint, other_established_connection_ids, concurrent_dial_errors, + handler, + muxer, } => { if self.banned_peers.contains(&peer_id) { // Mark the connection for the banned peer as banned, thus withholding any // future events from the connection to the behaviour. self.banned_peer_connections.insert(id); self.pool.disconnect(peer_id); + // TODO: Should we close the connection? return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); + + // TODO: The else below is unnecessary given that the above is a return. } else { let num_established = NonZeroU32::new( u32::try_from(other_established_connection_ids.len() + 1).unwrap(), @@ -706,6 +712,18 @@ where let failed_addresses = concurrent_dial_errors .as_ref() .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); + + // TODO: Clean up + if let Err(e) = self + .behaviour + .review_established_connection(peer_id, &endpoint) + { + todo!() + } + + self.pool + .add_established(peer_id, id, endpoint.clone(), muxer, handler); + self.behaviour.inject_connection_established( &peer_id, &id, @@ -863,7 +881,7 @@ where return Some(SwarmEvent::IncomingConnectionDenied); } - let connection_id = self.pool.add_incoming( + let connection_id = self.pool.add_pending_incoming( upgrade, handler, IncomingInfo { From ebe1131a1ca8aebcde450c40e333580b8e1393c5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 17 Aug 2022 17:45:37 +0900 Subject: [PATCH 09/11] *: Enforce outgoing established connection limit --- swarm-derive/src/lib.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index a9ae1d4c1db..ae77e5d98a2 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -240,8 +240,10 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .iter() .enumerate() .map(move |(field_n, field)| match field.ident { - Some(ref i) => quote! { self.#i.review_established_connection(peer_id, addresses, endpoint)?; }, - None => quote! { self.#field_n.review_established_connection(peer_id, addresses, endpoint)?; }, + Some(ref i) => { + quote! { self.#i.review_established_connection(peer_id, endpoint)?; } + } + None => quote! { self.#field_n.review_established_connection(peer_id, endpoint)?; }, }) }; @@ -660,7 +662,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { Ok(()) } - fn review_established_connection(&mut self, peer_id: Option, addresses: &[Multiaddr], endpoint: Endpoint) -> Result<(), #review_denied> { + fn review_established_connection(&mut self, peer_id: PeerId, endpoint: &#connected_point) -> Result<(), #review_denied> { #(#review_established_connection_stmts);* Ok(()) } From 7335a947f5b75f6e0d13405dfd8f7fa09b095a37 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 17 Aug 2022 17:46:31 +0900 Subject: [PATCH 10/11] *: Enforce outgoing established connection limit --- protocols/connection-limit/src/lib.rs | 24 ++++++++++++++++++++++++ protocols/connection-limit/tests/lib.rs | 14 +++++++++++++- swarm/src/connection/pool.rs | 23 +++++++++-------------- swarm/src/lib.rs | 10 +++++++++- 4 files changed, 55 insertions(+), 16 deletions(-) diff --git a/protocols/connection-limit/src/lib.rs b/protocols/connection-limit/src/lib.rs index 6176a35b9b3..8eebdd4dcf7 100644 --- a/protocols/connection-limit/src/lib.rs +++ b/protocols/connection-limit/src/lib.rs @@ -61,6 +61,18 @@ impl NetworkBehaviour for Behaviour { .map_err(|e| ReviewDenied::Error(e.into())) } + fn review_established_connection( + &mut self, + _peer_id: PeerId, + // TODO: Maybe an iterator is better? + endpoint: &ConnectedPoint, + ) -> Result<(), ReviewDenied> { + println!("review_established_connection"); + self.counters + .check_max_established(endpoint) + .map_err(|e| ReviewDenied::Error(e.into())) + } + fn inject_connection_pending( &mut self, _peer_id: Option, @@ -70,6 +82,18 @@ impl NetworkBehaviour for Behaviour { self.counters.inc_pending(endpoint) } + fn inject_connection_established( + &mut self, + _peer_id: &PeerId, + _connection_id: &ConnectionId, + endpoint: &ConnectedPoint, + _failed_addresses: Option<&Vec>, + _other_established: usize, + ) { + println!("inject_connection_established"); + self.counters.inc_established(endpoint) + } + fn inject_event( &mut self, _peer_id: PeerId, diff --git a/protocols/connection-limit/tests/lib.rs b/protocols/connection-limit/tests/lib.rs index 4c4f4006ad2..65dd17324a3 100644 --- a/protocols/connection-limit/tests/lib.rs +++ b/protocols/connection-limit/tests/lib.rs @@ -21,6 +21,7 @@ use libp2p_core::Multiaddr; use libp2p_core::PeerId; use libp2p_plaintext::PlainText2Config; use libp2p_swarm::dial_opts::DialOpts; +use libp2p_swarm::DialError; use libp2p_swarm::DummyBehaviour; use libp2p_swarm::KeepAlive; use libp2p_swarm::NetworkBehaviour; @@ -153,7 +154,18 @@ fn enforces_established_outbound_connection_limit() { } } - assert!(local_swarm.dial(remote_addr).is_err()); + local_swarm + .dial(remote_addr.clone()) + .ok() + .expect("Unexpected connection limit."); + + match pool.run_until(local_swarm.next()).unwrap() { + SwarmEvent::OutgoingConnectionError { + error: DialError::ConnectionReviewDenied(e), + .. + } => {} + e => panic!("Unexpected event {:?}", e), + } } QuickCheck::new().quickcheck(prop as fn(_)); diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 354aeba8a59..fd651adb879 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -782,20 +782,15 @@ where // TODO // self.counters.inc_established(&endpoint); - match self.get(id) { - Some(PoolConnection::Established(connection)) => { - return Poll::Ready(PoolEvent::ConnectionEstablished { - peer_id: connection.peer_id(), - endpoint: connection.endpoint().clone(), - id: connection.id(), - other_established_connection_ids, - concurrent_dial_errors, - handler, - muxer, - }) - } - _ => unreachable!("since `entry` is an `EstablishedEntry`."), - } + return Poll::Ready(PoolEvent::ConnectionEstablished { + peer_id: obtained_peer_id, + endpoint, + id, + other_established_connection_ids, + concurrent_dial_errors, + handler, + muxer, + }); } task::PendingConnectionEvent::PendingFailed { id, error } => { if let Some(PendingConnectionInfo { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index dd9bbf326ed..b2f71184986 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -718,7 +718,15 @@ where .behaviour .review_established_connection(peer_id, &endpoint) { - todo!() + match endpoint { + ConnectedPoint::Dialer { .. } => { + return Some(SwarmEvent::OutgoingConnectionError { + peer_id: Some(peer_id), + error: DialError::ConnectionReviewDenied(e), + }) + } + _ => todo!(), + } } self.pool From a7eb4cbcdc3d8e5ced5493a03a9796f0d21c4512 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 19 Aug 2022 13:26:12 +0900 Subject: [PATCH 11/11] protocols/connection-limit: Decrease count --- protocols/connection-limit/src/lib.rs | 22 ++++++++++++++++------ swarm/src/behaviour.rs | 1 + swarm/src/lib.rs | 2 ++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/protocols/connection-limit/src/lib.rs b/protocols/connection-limit/src/lib.rs index 8eebdd4dcf7..061551811a2 100644 --- a/protocols/connection-limit/src/lib.rs +++ b/protocols/connection-limit/src/lib.rs @@ -67,7 +67,6 @@ impl NetworkBehaviour for Behaviour { // TODO: Maybe an iterator is better? endpoint: &ConnectedPoint, ) -> Result<(), ReviewDenied> { - println!("review_established_connection"); self.counters .check_max_established(endpoint) .map_err(|e| ReviewDenied::Error(e.into())) @@ -90,10 +89,21 @@ impl NetworkBehaviour for Behaviour { _failed_addresses: Option<&Vec>, _other_established: usize, ) { - println!("inject_connection_established"); + self.counters.dec_pending(endpoint); self.counters.inc_established(endpoint) } + fn inject_connection_closed( + &mut self, + _: &PeerId, + _: &ConnectionId, + endpoint: &ConnectedPoint, + _: ::Handler, + _remaining_established: usize, + ) { + self.counters.dec_established(endpoint) + } + fn inject_event( &mut self, _peer_id: PeerId, @@ -188,12 +198,12 @@ impl ConnectionCounters { } } - fn dec_pending(&mut self, endpoint: &PendingPoint) { - match endpoint { - PendingPoint::Dialer { .. } => { + fn dec_pending(&mut self, endpoint: impl Into) { + match endpoint.into() { + Endpoint::Dialer => { self.pending_outgoing -= 1; } - PendingPoint::Listener { .. } => { + Endpoint::Listener => { self.pending_incoming -= 1; } } diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index c8e86270e94..f025bdbce3f 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -220,6 +220,7 @@ pub trait NetworkBehaviour: 'static { ) { } + // TODO: We need a `inject_connection_pending_closed` /// Informs the behaviour about a newly established connection to a peer. fn inject_connection_established( &mut self, diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f6d3fffd940..e692124fc4d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -685,6 +685,8 @@ where handler, muxer, } => { + // TODO: Call `NetworkBehaviour::inject_connection_pending_closed` in case we don't + // accept the established connection. if self.banned_peers.contains(&peer_id) { // Mark the connection for the banned peer as banned, thus withholding any // future events from the connection to the behaviour.