diff --git a/Cargo.toml b/Cargo.toml index 75867ddbf4f..85645998f90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,8 @@ categories = ["network-programming", "asynchronous"] [features] default = [ "autonat", + # TODO: Should this really be a default? + "connection-limit", "deflate", "dns-async-std", "floodsub", @@ -37,6 +39,7 @@ default = [ ] autonat = ["dep:libp2p-autonat"] +connection-limit = ["dep:libp2p-connection-limit"] dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"] deflate = ["dep:libp2p-deflate"] dns-async-std = ["dep:libp2p-dns", "libp2p-dns?/async-std"] @@ -79,6 +82,7 @@ lazy_static = "1.2" libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true } libp2p-core = { version = "0.35.0", path = "core", default-features = false } +libp2p-connection-limit = { version = "0.1.0", path = "protocols/connection-limit", optional = true } libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true } libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true } @@ -130,6 +134,7 @@ members = [ "misc/prost-codec", "muxers/mplex", "muxers/yamux", + "protocols/connection-limit", "protocols/dcutr", "protocols/autonat", "protocols/floodsub", diff --git a/protocols/connection-limit/Cargo.toml b/protocols/connection-limit/Cargo.toml new file mode 100644 index 00000000000..d2a4a59bef1 --- /dev/null +++ b/protocols/connection-limit/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "libp2p-connection-limit" +edition = "2021" +rust-version = "1.56.1" +description = "Basic connection limiting functionality" +version = "0.1.0" +authors = ["Max Inden "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.38.0", path = "../../swarm" } +# TODO: Still needed? +fnv = "1.0" +futures = "0.3.0" + +[dev-dependencies] +quickcheck = "0.9.0" +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-yamux = { path = "../../muxers/yamux" } +libp2p = { version = "0.47.0", path = "../..", default-features = false } \ No newline at end of file diff --git a/protocols/connection-limit/src/lib.rs b/protocols/connection-limit/src/lib.rs new file mode 100644 index 00000000000..061551811a2 --- /dev/null +++ b/protocols/connection-limit/src/lib.rs @@ -0,0 +1,356 @@ +// Copyright 2021 Protocol Labs. +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::connection::{ConnectedPoint, ConnectionId, Endpoint, PendingPoint}; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::handler::{ConnectionHandler, DummyConnectionHandler, IntoConnectionHandler}; +use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ReviewDenied}; +use std::error::Error; +use std::fmt; +use std::task::{Context, Poll}; + +pub struct Behaviour { + counters: ConnectionCounters, +} + +impl Behaviour { + pub fn new(limits: ConnectionLimits) -> Self { + Behaviour { + counters: ConnectionCounters::new(limits), + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = DummyConnectionHandler; + type OutEvent = (); + + fn new_handler(&mut self) -> Self::ConnectionHandler { + Default::default() + } + + fn review_pending_connection( + &mut self, + _peer_id: Option, + // TODO: Maybe an iterator is better? + _addresses: &[Multiaddr], + endpoint: Endpoint, + ) -> Result<(), ReviewDenied> { + match endpoint { + Endpoint::Dialer => self.counters.check_max_pending_outgoing(), + Endpoint::Listener => self.counters.check_max_pending_incoming(), + } + .map_err(|e| ReviewDenied::Error(e.into())) + } + + fn review_established_connection( + &mut self, + _peer_id: PeerId, + // TODO: Maybe an iterator is better? + endpoint: &ConnectedPoint, + ) -> Result<(), ReviewDenied> { + self.counters + .check_max_established(endpoint) + .map_err(|e| ReviewDenied::Error(e.into())) + } + + fn inject_connection_pending( + &mut self, + _peer_id: Option, + _connection_id: ConnectionId, + endpoint: Endpoint, + ) { + self.counters.inc_pending(endpoint) + } + + fn inject_connection_established( + &mut self, + _peer_id: &PeerId, + _connection_id: &ConnectionId, + endpoint: &ConnectedPoint, + _failed_addresses: Option<&Vec>, + _other_established: usize, + ) { + self.counters.dec_pending(endpoint); + self.counters.inc_established(endpoint) + } + + fn inject_connection_closed( + &mut self, + _: &PeerId, + _: &ConnectionId, + endpoint: &ConnectedPoint, + _: ::Handler, + _remaining_established: usize, + ) { + self.counters.dec_established(endpoint) + } + + fn inject_event( + &mut self, + _peer_id: PeerId, + _connection: ConnectionId, + _event: <::Handler as ConnectionHandler>::OutEvent, + ) { + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + Poll::Pending + } +} + +/// Network connection information. +#[derive(Debug, Clone)] +pub struct ConnectionCounters { + /// The effective connection limits. + limits: ConnectionLimits, + /// The current number of incoming connections. + pending_incoming: u32, + /// The current number of outgoing connections. + pending_outgoing: u32, + /// The current number of established inbound connections. + established_incoming: u32, + /// The current number of established outbound connections. + established_outgoing: u32, +} + +impl ConnectionCounters { + fn new(limits: ConnectionLimits) -> Self { + Self { + limits, + pending_incoming: 0, + pending_outgoing: 0, + established_incoming: 0, + established_outgoing: 0, + } + } + + /// The effective connection limits. + pub fn limits(&self) -> &ConnectionLimits { + &self.limits + } + + /// The total number of connections, both pending and established. + pub fn num_connections(&self) -> u32 { + self.num_pending() + self.num_established() + } + + /// The total number of pending connections, both incoming and outgoing. + pub fn num_pending(&self) -> u32 { + self.pending_incoming + self.pending_outgoing + } + + /// The number of incoming connections being established. + pub fn num_pending_incoming(&self) -> u32 { + self.pending_incoming + } + + /// The number of outgoing connections being established. + pub fn num_pending_outgoing(&self) -> u32 { + self.pending_outgoing + } + + /// The number of established incoming connections. + pub fn num_established_incoming(&self) -> u32 { + self.established_incoming + } + + /// The number of established outgoing connections. + pub fn num_established_outgoing(&self) -> u32 { + self.established_outgoing + } + + /// The total number of established connections. + pub fn num_established(&self) -> u32 { + self.established_outgoing + self.established_incoming + } + + fn inc_pending(&mut self, endpoint: Endpoint) { + match endpoint { + Endpoint::Dialer => { + self.pending_outgoing += 1; + } + Endpoint::Listener => { + self.pending_incoming += 1; + } + } + } + + fn dec_pending(&mut self, endpoint: impl Into) { + match endpoint.into() { + Endpoint::Dialer => { + self.pending_outgoing -= 1; + } + Endpoint::Listener => { + self.pending_incoming -= 1; + } + } + } + + fn inc_established(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { + self.established_outgoing += 1; + } + ConnectedPoint::Listener { .. } => { + self.established_incoming += 1; + } + } + } + + fn dec_established(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { + self.established_outgoing -= 1; + } + ConnectedPoint::Listener { .. } => { + self.established_incoming -= 1; + } + } + } + + fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { + Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) + } + + fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { + Self::check(self.pending_incoming, self.limits.max_pending_incoming) + } + + fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { + // Check total connection limit. + Self::check(self.num_established(), self.limits.max_established_total)?; + // Check incoming/outgoing connection limits + match endpoint { + ConnectedPoint::Dialer { .. } => Self::check( + self.established_outgoing, + self.limits.max_established_outgoing, + ), + ConnectedPoint::Listener { .. } => Self::check( + self.established_incoming, + self.limits.max_established_incoming, + ), + } + } + + fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { + Self::check(current, self.limits.max_established_per_peer) + } + + fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { + if let Some(limit) = limit { + if current >= limit { + return Err(ConnectionLimit { limit, current }); + } + } + Ok(()) + } +} + +// /// Counts the number of established connections to the given peer. +// fn num_peer_established( +// // TODO: Use normal HashMap. +// established: &FnvHashMap>>, +// peer: PeerId, +// ) -> u32 { +// established.get(&peer).map_or(0, |conns| { +// u32::try_from(conns.len()).expect("Unexpectedly large number of connections for a peer.") +// }) +// } + +/// The configurable connection limits. +/// +/// By default no connection limits apply. +#[derive(Debug, Clone, Default)] +pub struct ConnectionLimits { + max_pending_incoming: Option, + max_pending_outgoing: Option, + max_established_incoming: Option, + max_established_outgoing: Option, + max_established_per_peer: Option, + max_established_total: Option, +} + +impl ConnectionLimits { + /// Configures the maximum number of concurrently incoming connections being established. + pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { + self.max_pending_incoming = limit; + self + } + + /// Configures the maximum number of concurrently outgoing connections being established. + pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { + self.max_pending_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established inbound connections. + pub fn with_max_established_incoming(mut self, limit: Option) -> Self { + self.max_established_incoming = limit; + self + } + + /// Configures the maximum number of concurrent established outbound connections. + pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { + self.max_established_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established connections (both + /// inbound and outbound). + /// + /// Note: This should be used in conjunction with + /// [`ConnectionLimits::with_max_established_incoming`] to prevent possible + /// eclipse attacks (all connections being inbound). + pub fn with_max_established(mut self, limit: Option) -> Self { + self.max_established_total = limit; + self + } + + /// Configures the maximum number of concurrent established connections per peer, + /// regardless of direction (incoming or outgoing). + pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { + self.max_established_per_peer = limit; + self + } +} + +/// Information about a connection limit. +#[derive(Debug, Clone)] +pub struct ConnectionLimit { + /// The maximum number of connections. + pub limit: u32, + /// The current number of connections. + pub current: u32, +} + +impl fmt::Display for ConnectionLimit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.current, self.limit) + } +} + +/// A `ConnectionLimit` can represent an error if it has been exceeded. +impl Error for ConnectionLimit {} diff --git a/protocols/connection-limit/tests/lib.rs b/protocols/connection-limit/tests/lib.rs new file mode 100644 index 00000000000..65dd17324a3 --- /dev/null +++ b/protocols/connection-limit/tests/lib.rs @@ -0,0 +1,198 @@ +// TODO: Should imports go through libp2p crate? +use futures::executor::LocalPool; +use futures::future; +use futures::task::LocalSpawn; +use futures::task::LocalSpawnExt; +use futures::task::Spawn; +use futures::FutureExt; +use futures::StreamExt; +use futures::TryStreamExt; +use libp2p::NetworkBehaviour; +use libp2p_connection_limit::ConnectionLimits; +use libp2p_core::connection::{ConnectionId, Endpoint}; +use libp2p_core::identity; +use libp2p_core::identity::PublicKey; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::Boxed; +use libp2p_core::transport::TransportEvent; +use libp2p_core::transport::{MemoryTransport, Transport}; +use libp2p_core::upgrade; +use libp2p_core::Multiaddr; +use libp2p_core::PeerId; +use libp2p_plaintext::PlainText2Config; +use libp2p_swarm::dial_opts::DialOpts; +use libp2p_swarm::DialError; +use libp2p_swarm::DummyBehaviour; +use libp2p_swarm::KeepAlive; +use libp2p_swarm::NetworkBehaviour; +use libp2p_swarm::Swarm; +use libp2p_swarm::SwarmBuilder; +use libp2p_swarm::SwarmEvent; +use libp2p_yamux::YamuxConfig; +use quickcheck::QuickCheck; + +// TODO: Test that pending limit is decreased. + +#[test] +fn enforces_pending_outbound_connection_limit() { + fn prop(outbound: u8) { + let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outbound.into())); + + let mut swarm = make_swarm(limits); + + let addr: Multiaddr = "/memory/1234".parse().unwrap(); + + for _ in 0..outbound { + swarm + .dial( + DialOpts::peer_id(PeerId::random()) + .addresses(vec![addr.clone()]) + .build(), + ) + .ok() + .expect("Unexpected connection limit."); + } + + assert!(swarm + .dial( + DialOpts::peer_id(PeerId::random()) + .addresses(vec![addr.clone()]) + .build(), + ) + .is_err()); + } + + QuickCheck::new().quickcheck(prop as fn(_)); +} + +#[test] +fn enforces_pending_inbound_connection_limit() { + fn prop(inbound: u8) { + let mut pool = LocalPool::default(); + + let limits = ConnectionLimits::default().with_max_pending_incoming(Some(inbound.into())); + let mut swarm = make_swarm(limits); + + swarm.listen_on("/memory/0".parse().unwrap()).unwrap(); + let listen_addr = match pool.run_until(swarm.next()).unwrap() { + SwarmEvent::NewListenAddr { address, .. } => address, + e => panic!("Unexpected event {:?}", e), + }; + + pool.spawner().spawn_local(async move { + let mut remote_transport = MemoryTransport::default(); + + let dials = (0..inbound + 1) + .map(|_| remote_transport.dial(listen_addr.clone()).unwrap()) + .collect::>(); + + future::join( + future::try_join_all(dials), + Transport::boxed(remote_transport).collect::>>(), + ) + .await + .0 + .unwrap(); + }); + + for i in 0..inbound { + match pool.run_until(swarm.next()).unwrap() { + SwarmEvent::IncomingConnection { .. } => {} + e => panic!("Unexpected event {:?}", e), + } + } + + match pool.run_until(swarm.next()).unwrap() { + SwarmEvent::IncomingConnectionDenied => {} + e => panic!("Unexpected event {:?}", e), + } + } + + QuickCheck::new().quickcheck(prop as fn(_)); +} + +#[test] +fn enforces_established_outbound_connection_limit() { + fn prop(outbound: u8) { + let mut pool = LocalPool::default(); + let limits = + ConnectionLimits::default().with_max_established_outgoing(Some(outbound.into())); + + let mut local_swarm = make_swarm(limits); + let mut remote_transport = make_transport(identity::Keypair::generate_ed25519().public()); + + remote_transport + .listen_on("/memory/0".parse().unwrap()) + .unwrap(); + let remote_addr = match pool.run_until(remote_transport.next()).unwrap() { + TransportEvent::NewAddress { listen_addr, .. } => listen_addr, + e => panic!("Unexpected event {:?}", e), + }; + + pool.spawner() + .spawn_local({ + remote_transport + .flat_map(|e| match e { + TransportEvent::Incoming { upgrade, .. } => upgrade.into_stream(), + e => panic!("Unpexted event {:?}", e), + }) + .try_collect::>() + .map(|r| r.map(|_| ()).unwrap()) + }) + .unwrap(); + + for i in 0..outbound { + println!("{:?}", i); + local_swarm + .dial(remote_addr.clone()) + .ok() + .expect("Unexpected connection limit."); + + match pool.run_until(local_swarm.next()).unwrap() { + SwarmEvent::ConnectionEstablished { .. } => {} + e => panic!("Unexpected event {:?}", e), + } + } + + local_swarm + .dial(remote_addr.clone()) + .ok() + .expect("Unexpected connection limit."); + + match pool.run_until(local_swarm.next()).unwrap() { + SwarmEvent::OutgoingConnectionError { + error: DialError::ConnectionReviewDenied(e), + .. + } => {} + e => panic!("Unexpected event {:?}", e), + } + } + + QuickCheck::new().quickcheck(prop as fn(_)); +} + +#[derive(NetworkBehaviour)] +struct Behaviour { + limit: libp2p_connection_limit::Behaviour, + keep_alive: DummyBehaviour, +} + +fn make_swarm(limits: ConnectionLimits) -> Swarm { + let behaviour = Behaviour { + limit: libp2p_connection_limit::Behaviour::new(limits), + keep_alive: DummyBehaviour::with_keep_alive(KeepAlive::Yes), + }; + let local_public_key = identity::Keypair::generate_ed25519().public(); + let transport = make_transport(local_public_key.clone()); + SwarmBuilder::new(transport, behaviour, local_public_key.into()).build() +} + +fn make_transport(local_public_key: PublicKey) -> Boxed<(PeerId, StreamMuxerBox)> { + MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(YamuxConfig::default()) + .boxed() +} diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 51305e38190..ae77e5d98a2 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -60,6 +60,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let connected_point = quote! {::libp2p::core::ConnectedPoint}; let listener_id = quote! {::libp2p::core::transport::ListenerId}; let dial_error = quote! {::libp2p::swarm::DialError}; + let review_denied = quote! {::libp2p::swarm::ReviewDenied}; let poll_parameters = quote! {::libp2p::swarm::PollParameters}; @@ -222,6 +223,41 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; + // TODO + let review_pending_connection_stmts = { + data_struct_fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => quote! { self.#i.review_pending_connection(peer_id, addresses, endpoint)?; }, + None => quote! { self.#field_n.review_pending_connection(peer_id, addresses, endpoint)?; }, + }) + }; + + // TODO + let review_established_connection_stmts = { + data_struct_fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => { + quote! { self.#i.review_established_connection(peer_id, endpoint)?; } + } + None => quote! { self.#field_n.review_established_connection(peer_id, endpoint)?; }, + }) + }; + + // TODO + let inject_connection_pending_stmts = { + data_struct_fields + .iter() + .enumerate() + .map(move |(field_n, field)| match field.ident { + Some(ref i) => quote! { self.#i.inject_connection_pending(peer_id, connection_id, endpoint); }, + None => quote! { self.#field_n.inject_connection_pending(peer_id, connection_id, endpoint); }, + }) + }; + // Build the list of statements to put in the body of `inject_connection_established()`. let inject_connection_established_stmts = { data_struct_fields.iter().enumerate().map(move |(field_n, field)| { @@ -621,6 +657,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } + fn review_pending_connection(&mut self, peer_id: Option, addresses: &[Multiaddr], endpoint: Endpoint) -> Result<(), #review_denied> { + #(#review_pending_connection_stmts);* + Ok(()) + } + + fn review_established_connection(&mut self, peer_id: PeerId, endpoint: &#connected_point) -> Result<(), #review_denied> { + #(#review_established_connection_stmts);* + Ok(()) + } + + fn inject_connection_pending(&mut self, peer_id: Option, connection_id: ConnectionId, endpoint: Endpoint) { + #(#inject_connection_pending_stmts);* + } + fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors, other_established: usize) { #(#inject_connection_established_stmts);* } diff --git a/swarm-derive/tests/test.rs b/swarm-derive/tests/test.rs index f4a313198a9..7dde12a1583 100644 --- a/swarm-derive/tests/test.rs +++ b/swarm-derive/tests/test.rs @@ -299,6 +299,19 @@ fn where_clause() { } } +#[test] +fn where_clause() { + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + struct Foo + where + T: Copy + NetworkBehaviour, + { + ping: libp2p::ping::Ping, + bar: T, + } +} + #[test] fn nested_derives_with_import() { #[allow(dead_code)] diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index c0ac597680c..f025bdbce3f 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -24,10 +24,11 @@ pub mod toggle; use crate::dial_opts::DialOpts; use crate::handler::{ConnectionHandler, IntoConnectionHandler}; use crate::{AddressRecord, AddressScore, DialError}; -use libp2p_core::{ - connection::ConnectionId, transport::ListenerId, ConnectedPoint, Multiaddr, PeerId, -}; -use std::{task::Context, task::Poll}; +use libp2p_core::connection::{ConnectionId, Endpoint}; +use libp2p_core::{transport::ListenerId, ConnectedPoint, Multiaddr, PeerId}; +use std::error::Error; +use std::task::Context; +use std::task::Poll; /// Custom event that can be received by the [`ConnectionHandler`]. pub(crate) type THandlerInEvent = @@ -191,6 +192,35 @@ pub trait NetworkBehaviour: 'static { vec![] } + // TODO: Make sure this and all the methods below are really implemented across all behaviours. + fn review_pending_connection( + &mut self, + _peer_id: Option, + // TODO: Maybe an iterator is better? + _addresses: &[Multiaddr], + _endpoint: Endpoint, + ) -> Result<(), ReviewDenied> { + Ok(()) + } + + fn review_established_connection( + &mut self, + _peer_id: PeerId, + // TODO: Maybe an iterator is better? + _endpoint: &ConnectedPoint, + ) -> Result<(), ReviewDenied> { + Ok(()) + } + + fn inject_connection_pending( + &mut self, + _peer_id: Option, + _connection_id: ConnectionId, + _endpoint: Endpoint, + ) { + } + + // TODO: We need a `inject_connection_pending_closed` /// Informs the behaviour about a newly established connection to a peer. fn inject_connection_established( &mut self, @@ -778,3 +808,9 @@ impl Default for CloseConnection { CloseConnection::All } } + +// TODO: Needed in the first place? Are there some common errors that we want? +#[derive(Debug)] +pub enum ReviewDenied { + Error(Box), +} diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 24e54aba525..2cdb08f400c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -27,7 +27,6 @@ pub use error::{ ConnectionError, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; -pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; @@ -40,7 +39,7 @@ use libp2p_core::upgrade; use libp2p_core::PeerId; use std::collections::VecDeque; use std::future::Future; -use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; +use std::{fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -208,21 +207,3 @@ impl<'a> IncomingInfo<'a> { } } } - -/// Information about a connection limit. -#[derive(Debug, Clone)] -pub struct ConnectionLimit { - /// The maximum number of connections. - pub limit: u32, - /// The current number of connections. - pub current: u32, -} - -impl fmt::Display for ConnectionLimit { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}/{}", self.current, self.limit) - } -} - -/// A `ConnectionLimit` can represent an error if it has been exceeded. -impl Error for ConnectionLimit {} diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 8a6d6bbbf00..962f213051d 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -21,7 +21,7 @@ use super::handler_wrapper; use crate::transport::TransportError; use crate::Multiaddr; -use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; +use crate::{ConnectedPoint, PeerId}; use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. @@ -99,9 +99,10 @@ pub enum PendingConnectionError { /// An error occurred while negotiating the transport protocol(s) on a connection. Transport(TTransErr), + // TODO: Still needed? /// The connection was dropped because the connection limit /// for a peer has been reached. - ConnectionLimit(ConnectionLimit), + // ConnectionLimit(ConnectionLimit), /// Pending connection attempt has been aborted. Aborted, @@ -122,9 +123,6 @@ impl PendingConnectionError { pub fn map(self, f: impl FnOnce(T) -> U) -> PendingConnectionError { match self { PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)), - PendingConnectionError::ConnectionLimit(l) => { - PendingConnectionError::ConnectionLimit(l) - } PendingConnectionError::Aborted => PendingConnectionError::Aborted, PendingConnectionError::WrongPeerId { obtained, endpoint } => { PendingConnectionError::WrongPeerId { obtained, endpoint } @@ -149,9 +147,6 @@ where err ) } - PendingConnectionError::ConnectionLimit(l) => { - write!(f, "Connection error: Connection limit: {}.", l) - } PendingConnectionError::WrongPeerId { obtained, endpoint } => { write!( f, @@ -173,7 +168,6 @@ where PendingConnectionError::Transport(_) => None, PendingConnectionError::WrongPeerId { .. } => None, PendingConnectionError::Aborted => None, - PendingConnectionError::ConnectionLimit(..) => None, } } } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 62e931e9510..fd651adb879 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -22,7 +22,7 @@ use crate::{ behaviour::{THandlerInEvent, THandlerOutEvent}, connection::{ - Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, + Connected, ConnectionError, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, transport::{Transport, TransportError}, @@ -41,7 +41,6 @@ use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use std::{ collections::{hash_map, HashMap}, - convert::TryFrom as _, fmt, num::{NonZeroU8, NonZeroUsize}, pin::Pin, @@ -60,9 +59,6 @@ where { local_id: PeerId, - /// The connection counter(s). - counters: ConnectionCounters, - /// The managed connections of each peer that are currently considered established. established: FnvHashMap< PeerId, @@ -144,6 +140,7 @@ impl EstablishedConnectionInfo { struct PendingConnectionInfo { /// [`PeerId`] of the remote peer. peer_id: Option, + // TODO: There is no need to have the handler here already. /// Handler to handle connection once no longer pending but established. handler: THandler, endpoint: PendingPoint, @@ -153,14 +150,11 @@ struct PendingConnectionInfo { impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Pool") - .field("counters", &self.counters) - .finish() + f.debug_struct("Pool").finish() } } /// Event that can happen on the `Pool`. -#[derive(Debug)] pub enum PoolEvent where TTrans: Transport, @@ -178,6 +172,9 @@ where /// Addresses are dialed in parallel. Contains the addresses and errors /// of dial attempts that failed before the one successful dial. concurrent_dial_errors: Option)>>, + // TODO: It doesn't make sense to have the handler here already. + handler: THandler, + muxer: StreamMuxerBox, }, /// An established connection was closed. @@ -248,20 +245,25 @@ where }, } +impl fmt::Debug for PoolEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + todo!("due to streammuxerbox") + } +} + impl Pool where THandler: IntoConnectionHandler, TTrans: Transport, { /// Creates a new empty `Pool`. - pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { + pub fn new(local_id: PeerId, config: PoolConfig) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); let (established_connection_events_tx, established_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); Pool { local_id, - counters: ConnectionCounters::new(limits), established: Default::default(), pending: Default::default(), next_connection_id: ConnectionId::new(0), @@ -278,11 +280,6 @@ where } } - /// Gets the dedicated connection counters. - pub fn counters(&self) -> &ConnectionCounters { - &self.counters - } - /// Gets an entry representing a connection in the pool. /// /// Returns `None` if the pool has no connection with the given ID. @@ -415,7 +412,7 @@ where /// /// Returns an error if the limit of pending outgoing connections /// has been reached. - pub fn add_outgoing( + pub fn add_pending_outgoing( &mut self, dials: Vec< BoxFuture< @@ -433,14 +430,15 @@ where handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result + ) -> ConnectionId where TTrans: Send, TTrans::Dial: Send + 'static, { - if let Err(limit) = self.counters.check_max_pending_outgoing() { - return Err((limit, handler)); - }; + // TODO + // if let Err(limit) = self.counters.check_max_pending_outgoing() { + // return Err((limit, handler)); + // }; let dial = ConcurrentDial::new( dials, @@ -463,7 +461,8 @@ where let endpoint = PendingPoint::Dialer { role_override }; - self.counters.inc_pending(&endpoint); + // TODO + // self.counters.inc_pending(&endpoint); self.pending.insert( connection_id, PendingConnectionInfo { @@ -473,7 +472,7 @@ where abort_notifier: Some(abort_notifier), }, ); - Ok(connection_id) + connection_id } /// Adds a pending incoming connection to the pool in the form of a @@ -481,20 +480,21 @@ where /// /// Returns an error if the limit of pending incoming connections /// has been reached. - pub fn add_incoming( + pub fn add_pending_incoming( &mut self, future: TFut, handler: THandler, info: IncomingInfo<'_>, - ) -> Result + ) -> ConnectionId where TFut: Future> + Send + 'static, { let endpoint = info.create_connected_point(); - if let Err(limit) = self.counters.check_max_pending_incoming() { - return Err((limit, handler)); - } + // TODO + // if let Err(limit) = self.counters.check_max_pending_incoming() { + // return Err((limit, handler)); + // } let connection_id = self.next_connection_id(); @@ -510,7 +510,8 @@ where .boxed(), ); - self.counters.inc_pending_incoming(); + // TODO + // self.counters.inc_pending_incoming(); self.pending.insert( connection_id, PendingConnectionInfo { @@ -520,7 +521,48 @@ where abort_notifier: Some(abort_notifier), }, ); - Ok(connection_id) + connection_id + } + + pub fn add_established( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + muxer: StreamMuxerBox, + handler: THandler, + ) { + // Add the connection to the pool. + let conns = self.established.entry(peer_id).or_default(); + + let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); + conns.insert( + connection_id, + EstablishedConnectionInfo { + peer_id: peer_id, + endpoint: endpoint.clone(), + sender: command_sender, + }, + ); + + let connection = super::Connection::new( + peer_id, + endpoint, + muxer, + handler, + self.substream_upgrade_protocol_override, + self.max_negotiating_inbound_streams, + ); + self.spawn( + task::new_for_established_connection( + connection_id, + peer_id, + connection, + command_receiver, + self.established_connection_events_tx.clone(), + ) + .boxed(), + ); } /// Polls the connection pool for events. @@ -577,7 +619,8 @@ where .expect("`Closed` event for established connection"); let EstablishedConnectionInfo { endpoint, .. } = connections.remove(&id).expect("Connection to be present"); - self.counters.dec_established(&endpoint); + // TODO + // self.counters.dec_established(&endpoint); let remaining_established_connection_ids: Vec = connections.keys().cloned().collect(); if remaining_established_connection_ids.is_empty() { @@ -617,7 +660,8 @@ where .remove(&id) .expect("Entry in `self.pending` for previously pending connection."); - self.counters.dec_pending(&endpoint); + // TODO + // self.counters.dec_pending(&endpoint); let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) { (PendingPoint::Dialer { role_override }, Some((address, errors))) => ( @@ -648,20 +692,23 @@ where ), }; - let error: Result<(), PendingInboundConnectionError<_>> = self - .counters - // Check general established connection limit. - .check_max_established(&endpoint) - .map_err(PendingConnectionError::ConnectionLimit) + let error: Result<(), PendingInboundConnectionError<_>> = Ok(()) + // TODO + // self + // .counters + // // Check general established connection limit. + // .check_max_established(&endpoint) + // .map_err(PendingConnectionError::ConnectionLimit) // Check per-peer established connection limit. - .and_then(|()| { - self.counters - .check_max_established_per_peer(num_peer_established( - &self.established, - obtained_peer_id, - )) - .map_err(PendingConnectionError::ConnectionLimit) - }) + // TODO + // .and_then(|()| { + // self.counters + // .check_max_established_per_peer(num_peer_established( + // &self.established, + // obtained_peer_id, + // )) + // .map_err(PendingConnectionError::ConnectionLimit) + // }) // Check expected peer id matches. .and_then(|()| { if let Some(peer) = expected_peer_id { @@ -730,53 +777,20 @@ where }; } - // Add the connection to the pool. let conns = self.established.entry(obtained_peer_id).or_default(); let other_established_connection_ids = conns.keys().cloned().collect(); - self.counters.inc_established(&endpoint); + // TODO + // self.counters.inc_established(&endpoint); - let (command_sender, command_receiver) = - mpsc::channel(self.task_command_buffer_size); - conns.insert( - id, - EstablishedConnectionInfo { - peer_id: obtained_peer_id, - endpoint: endpoint.clone(), - sender: command_sender, - }, - ); - - let connection = super::Connection::new( - obtained_peer_id, + return Poll::Ready(PoolEvent::ConnectionEstablished { + peer_id: obtained_peer_id, endpoint, - muxer, + id, + other_established_connection_ids, + concurrent_dial_errors, handler, - self.substream_upgrade_protocol_override, - self.max_negotiating_inbound_streams, - ); - self.spawn( - task::new_for_established_connection( - id, - obtained_peer_id, - connection, - command_receiver, - self.established_connection_events_tx.clone(), - ) - .boxed(), - ); - - match self.get(id) { - Some(PoolConnection::Established(connection)) => { - return Poll::Ready(PoolEvent::ConnectionEstablished { - peer_id: connection.peer_id(), - endpoint: connection.endpoint().clone(), - id: connection.id(), - other_established_connection_ids, - concurrent_dial_errors, - }) - } - _ => unreachable!("since `entry` is an `EstablishedEntry`."), - } + muxer, + }); } task::PendingConnectionEvent::PendingFailed { id, error } => { if let Some(PendingConnectionInfo { @@ -786,7 +800,8 @@ where abort_notifier: _, }) = self.pending.remove(&id) { - self.counters.dec_pending(&endpoint); + // TODO + // self.counters.dec_pending(&endpoint); match (endpoint, error) { (PendingPoint::Dialer { .. }, Either::Left(error)) => { @@ -923,225 +938,6 @@ impl EstablishedConnection<'_, TInEvent> { } } -/// Network connection information. -#[derive(Debug, Clone)] -pub struct ConnectionCounters { - /// The effective connection limits. - limits: ConnectionLimits, - /// The current number of incoming connections. - pending_incoming: u32, - /// The current number of outgoing connections. - pending_outgoing: u32, - /// The current number of established inbound connections. - established_incoming: u32, - /// The current number of established outbound connections. - established_outgoing: u32, -} - -impl ConnectionCounters { - fn new(limits: ConnectionLimits) -> Self { - Self { - limits, - pending_incoming: 0, - pending_outgoing: 0, - established_incoming: 0, - established_outgoing: 0, - } - } - - /// The effective connection limits. - pub fn limits(&self) -> &ConnectionLimits { - &self.limits - } - - /// The total number of connections, both pending and established. - pub fn num_connections(&self) -> u32 { - self.num_pending() + self.num_established() - } - - /// The total number of pending connections, both incoming and outgoing. - pub fn num_pending(&self) -> u32 { - self.pending_incoming + self.pending_outgoing - } - - /// The number of incoming connections being established. - pub fn num_pending_incoming(&self) -> u32 { - self.pending_incoming - } - - /// The number of outgoing connections being established. - pub fn num_pending_outgoing(&self) -> u32 { - self.pending_outgoing - } - - /// The number of established incoming connections. - pub fn num_established_incoming(&self) -> u32 { - self.established_incoming - } - - /// The number of established outgoing connections. - pub fn num_established_outgoing(&self) -> u32 { - self.established_outgoing - } - - /// The total number of established connections. - pub fn num_established(&self) -> u32 { - self.established_outgoing + self.established_incoming - } - - fn inc_pending(&mut self, endpoint: &PendingPoint) { - match endpoint { - PendingPoint::Dialer { .. } => { - self.pending_outgoing += 1; - } - PendingPoint::Listener { .. } => { - self.pending_incoming += 1; - } - } - } - - fn inc_pending_incoming(&mut self) { - self.pending_incoming += 1; - } - - fn dec_pending(&mut self, endpoint: &PendingPoint) { - match endpoint { - PendingPoint::Dialer { .. } => { - self.pending_outgoing -= 1; - } - PendingPoint::Listener { .. } => { - self.pending_incoming -= 1; - } - } - } - - fn inc_established(&mut self, endpoint: &ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outgoing += 1; - } - ConnectedPoint::Listener { .. } => { - self.established_incoming += 1; - } - } - } - - fn dec_established(&mut self, endpoint: &ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outgoing -= 1; - } - ConnectedPoint::Listener { .. } => { - self.established_incoming -= 1; - } - } - } - - fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { - Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) - } - - fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { - Self::check(self.pending_incoming, self.limits.max_pending_incoming) - } - - fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { - // Check total connection limit. - Self::check(self.num_established(), self.limits.max_established_total)?; - // Check incoming/outgoing connection limits - match endpoint { - ConnectedPoint::Dialer { .. } => Self::check( - self.established_outgoing, - self.limits.max_established_outgoing, - ), - ConnectedPoint::Listener { .. } => Self::check( - self.established_incoming, - self.limits.max_established_incoming, - ), - } - } - - fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { - Self::check(current, self.limits.max_established_per_peer) - } - - fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { - if let Some(limit) = limit { - if current >= limit { - return Err(ConnectionLimit { limit, current }); - } - } - Ok(()) - } -} - -/// Counts the number of established connections to the given peer. -fn num_peer_established( - established: &FnvHashMap>>, - peer: PeerId, -) -> u32 { - established.get(&peer).map_or(0, |conns| { - u32::try_from(conns.len()).expect("Unexpectedly large number of connections for a peer.") - }) -} - -/// The configurable connection limits. -/// -/// By default no connection limits apply. -#[derive(Debug, Clone, Default)] -pub struct ConnectionLimits { - max_pending_incoming: Option, - max_pending_outgoing: Option, - max_established_incoming: Option, - max_established_outgoing: Option, - max_established_per_peer: Option, - max_established_total: Option, -} - -impl ConnectionLimits { - /// Configures the maximum number of concurrently incoming connections being established. - pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { - self.max_pending_incoming = limit; - self - } - - /// Configures the maximum number of concurrently outgoing connections being established. - pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { - self.max_pending_outgoing = limit; - self - } - - /// Configures the maximum number of concurrent established inbound connections. - pub fn with_max_established_incoming(mut self, limit: Option) -> Self { - self.max_established_incoming = limit; - self - } - - /// Configures the maximum number of concurrent established outbound connections. - pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { - self.max_established_outgoing = limit; - self - } - - /// Configures the maximum number of concurrent established connections (both - /// inbound and outbound). - /// - /// Note: This should be used in conjunction with - /// [`ConnectionLimits::with_max_established_incoming`] to prevent possible - /// eclipse attacks (all connections being inbound). - pub fn with_max_established(mut self, limit: Option) -> Self { - self.max_established_total = limit; - self - } - - /// Configures the maximum number of concurrent established connections per peer, - /// regardless of direction (incoming or outgoing). - pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { - self.max_established_per_peer = limit; - self - } -} - /// Configuration options when creating a [`Pool`]. /// /// The default configuration specifies no dedicated task executor, a diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 38df855cff5..e692124fc4d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -67,10 +67,11 @@ pub mod handler; pub use behaviour::NetworkBehaviourEventProcess; pub use behaviour::{ CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ReviewDenied, }; pub use connection::{ - ConnectionCounters, ConnectionError, ConnectionLimit, ConnectionLimits, PendingConnectionError, - PendingInboundConnectionError, PendingOutboundConnectionError, + ConnectionError, PendingConnectionError, PendingInboundConnectionError, + PendingOutboundConnectionError, }; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, @@ -180,6 +181,8 @@ pub enum SwarmEvent { /// Address used to send back data to the remote. send_back_addr: Multiaddr, }, + // TODO: Extend + IncomingConnectionDenied, /// An error happened on a connection during its initial handshake. /// /// This can include, for example, an error during the handshake of the encryption layer, or @@ -282,6 +285,7 @@ where /// similar mechanisms. external_addrs: Addresses, + // TODO: I think this should move into the connection manager behaviour as well. /// List of nodes for which we deny any incoming connection. banned_peers: HashSet, @@ -315,11 +319,7 @@ where /// Returns information about the connections underlying the [`Swarm`]. pub fn network_info(&self) -> NetworkInfo { let num_peers = self.pool.num_peers(); - let connection_counters = self.pool.counters().clone(); - NetworkInfo { - num_peers, - connection_counters, - } + NetworkInfo { num_peers } } /// Starts listening on the given address. @@ -505,6 +505,13 @@ where } }; + self.behaviour.review_pending_connection( + peer_id, + // TODO: Pass the addresses. + &[], + Endpoint::Dialer, + )?; + let dials = addresses .map(|a| match p2p_addr(peer_id, a) { Ok(address) => { @@ -527,20 +534,19 @@ where }) .collect(); - match self.pool.add_outgoing( + let connection_id = self.pool.add_pending_outgoing( dials, peer_id, handler, role_override, dial_concurrency_factor_override, - ) { - Ok(_connection_id) => Ok(()), - Err((connection_limit, handler)) => { - let error = DialError::ConnectionLimit(connection_limit); - self.behaviour.inject_dial_failure(None, handler, &error); - Err(error) - } - } + ); + + self.behaviour + .inject_connection_pending(peer_id, connection_id, Endpoint::Dialer); + // TODO + // self.behaviour.inject_dial_failure(None, handler, &error); + Ok(()) } /// Returns an iterator that produces the list of addresses we're listening on. @@ -676,13 +682,20 @@ where endpoint, other_established_connection_ids, concurrent_dial_errors, + handler, + muxer, } => { + // TODO: Call `NetworkBehaviour::inject_connection_pending_closed` in case we don't + // accept the established connection. if self.banned_peers.contains(&peer_id) { // Mark the connection for the banned peer as banned, thus withholding any // future events from the connection to the behaviour. self.banned_peer_connections.insert(id); self.pool.disconnect(peer_id); + // TODO: Should we close the connection? return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); + + // TODO: The else below is unnecessary given that the above is a return. } else { let num_established = NonZeroU32::new( u32::try_from(other_established_connection_ids.len() + 1).unwrap(), @@ -703,6 +716,26 @@ where let failed_addresses = concurrent_dial_errors .as_ref() .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); + + // TODO: Clean up + if let Err(e) = self + .behaviour + .review_established_connection(peer_id, &endpoint) + { + match endpoint { + ConnectedPoint::Dialer { .. } => { + return Some(SwarmEvent::OutgoingConnectionError { + peer_id: Some(peer_id), + error: DialError::ConnectionReviewDenied(e), + }) + } + _ => todo!(), + } + } + + self.pool + .add_established(peer_id, id, endpoint.clone(), muxer, handler); + self.behaviour.inject_connection_established( &peer_id, &id, @@ -843,27 +876,43 @@ where local_addr, send_back_addr, } => { + // TODO: Ideally we would only request the handler afterwards, or maybe even in the + // review_pending_connection call. + let handler = self.behaviour.new_handler(); - match self.pool.add_incoming( + + if let Err(err) = self.behaviour.review_pending_connection( + None, + // TODO: It does not make sense to provide a vector here. + &[send_back_addr.clone()], + Endpoint::Listener, + ) { + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + log::warn!("Incoming connection rejected: {:?}", err); + return Some(SwarmEvent::IncomingConnectionDenied); + } + + let connection_id = self.pool.add_pending_incoming( upgrade, handler, IncomingInfo { local_addr: &local_addr, send_back_addr: &send_back_addr, }, - ) { - Ok(_connection_id) => { - return Some(SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - }); - } - Err((connection_limit, handler)) => { - self.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); - log::warn!("Incoming connection rejected: {:?}", connection_limit); - } - }; + ); + + self.behaviour + .inject_connection_pending(None, connection_id, Endpoint::Listener); + + return Some(SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + }); + + // TODO: Still needed? + // self.behaviour + // .inject_listen_failure(&local_addr, &send_back_addr, handler); } TransportEvent::NewAddress { listener_id, @@ -927,7 +976,6 @@ where return Some(SwarmEvent::ListenerError { listener_id, error }); } } - None } fn handle_behaviour_event( @@ -1268,7 +1316,6 @@ pub struct SwarmBuilder { transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, pool_config: PoolConfig, - connection_limits: ConnectionLimits, } impl SwarmBuilder @@ -1288,7 +1335,6 @@ where transport, behaviour, pool_config: Default::default(), - connection_limits: Default::default(), } } @@ -1349,12 +1395,6 @@ where self } - /// Configures the connection limits. - pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self { - self.connection_limits = limits; - self - } - /// Configures an override for the substream upgrade protocol to use. /// /// The subtream upgrade protocol is the multistream-select protocol @@ -1407,7 +1447,7 @@ where Swarm { local_peer_id: self.local_peer_id, transport: self.transport, - pool: Pool::new(self.local_peer_id, pool_config, self.connection_limits), + pool: Pool::new(self.local_peer_id, pool_config), behaviour: self.behaviour, supported_protocols, listened_addrs: HashMap::new(), @@ -1424,9 +1464,7 @@ where pub enum DialError { /// The peer is currently banned. Banned, - /// The configured limit for simultaneous outgoing connections - /// has been reached. - ConnectionLimit(ConnectionLimit), + ConnectionReviewDenied(ReviewDenied), /// The peer being dialed is the local peer and thus the dial was aborted. LocalPeerId, /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses @@ -1450,10 +1488,15 @@ pub enum DialError { Transport(Vec<(Multiaddr, TransportError)>), } +impl From for DialError { + fn from(denied: ReviewDenied) -> Self { + DialError::ConnectionReviewDenied(denied) + } +} + impl From> for DialError { fn from(error: PendingOutboundConnectionError) -> Self { match error { - PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit), PendingConnectionError::Aborted => DialError::Aborted, PendingConnectionError::WrongPeerId { obtained, endpoint } => { DialError::WrongPeerId { obtained, endpoint } @@ -1467,8 +1510,8 @@ impl From> for DialError { impl fmt::Display for DialError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), + DialError::ConnectionReviewDenied(_)=> todo!(), DialError::LocalPeerId => write!(f, "Dial error: tried to dial local peer id."), DialError::Banned => write!(f, "Dial error: peer is banned."), DialError::DialPeerConditionFalse(c) => { @@ -1496,8 +1539,8 @@ impl fmt::Display for DialError { impl error::Error for DialError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - DialError::ConnectionLimit(err) => Some(err), DialError::LocalPeerId => None, + DialError::ConnectionReviewDenied(_) => None, DialError::NoAddresses => None, DialError::Banned => None, DialError::DialPeerConditionFalse(_) => None, @@ -1562,13 +1605,12 @@ impl NetworkBehaviour for DummyBehaviour { } } +// TODO: Struct still needed now that connection limit is gone? /// Information about the connections obtained by [`Swarm::network_info()`]. #[derive(Clone, Debug)] pub struct NetworkInfo { /// The total number of connected peers. num_peers: usize, - /// Counters of ongoing network connections. - connection_counters: ConnectionCounters, } impl NetworkInfo { @@ -1577,11 +1619,6 @@ impl NetworkInfo { pub fn num_peers(&self) -> usize { self.num_peers } - - /// Gets counters for ongoing network connections. - pub fn connection_counters(&self) -> &ConnectionCounters { - &self.connection_counters - } } /// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer. @@ -2120,185 +2157,6 @@ mod tests { QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _); } - #[test] - fn max_outgoing() { - use rand::Rng; - - let outgoing_limit = rand::thread_rng().gen_range(1, 10); - - let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)); - let mut network = new_test_swarm::<_, ()>(DummyConnectionHandler { - keep_alive: KeepAlive::Yes, - }) - .connection_limits(limits) - .build(); - - let addr: Multiaddr = "/memory/1234".parse().unwrap(); - - let target = PeerId::random(); - for _ in 0..outgoing_limit { - network - .dial( - DialOpts::peer_id(target) - .addresses(vec![addr.clone()]) - .build(), - ) - .ok() - .expect("Unexpected connection limit."); - } - - match network - .dial( - DialOpts::peer_id(target) - .addresses(vec![addr.clone()]) - .build(), - ) - .expect_err("Unexpected dialing success.") - { - DialError::ConnectionLimit(limit) => { - assert_eq!(limit.current, outgoing_limit); - assert_eq!(limit.limit, outgoing_limit); - } - e => panic!("Unexpected error: {:?}", e), - } - - let info = network.network_info(); - assert_eq!(info.num_peers(), 0); - assert_eq!( - info.connection_counters().num_pending_outgoing(), - outgoing_limit - ); - } - - #[test] - fn max_established_incoming() { - use rand::Rng; - - #[derive(Debug, Clone)] - struct Limit(u32); - - impl Arbitrary for Limit { - fn arbitrary(g: &mut G) -> Self { - Self(g.gen_range(1, 10)) - } - } - - fn limits(limit: u32) -> ConnectionLimits { - ConnectionLimits::default().with_max_established_incoming(Some(limit)) - } - - fn prop(limit: Limit) { - let limit = limit.0; - - let mut network1 = new_test_swarm::<_, ()>(DummyConnectionHandler { - keep_alive: KeepAlive::Yes, - }) - .connection_limits(limits(limit)) - .build(); - let mut network2 = new_test_swarm::<_, ()>(DummyConnectionHandler { - keep_alive: KeepAlive::Yes, - }) - .connection_limits(limits(limit)) - .build(); - - let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap(); - let listen_addr = async_std::task::block_on(poll_fn(|cx| { - match ready!(network1.poll_next_unpin(cx)).unwrap() { - SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address), - e => panic!("Unexpected network event: {:?}", e), - } - })); - - // Spawn and block on the dialer. - async_std::task::block_on({ - let mut n = 0; - let _ = network2.dial(listen_addr.clone()).unwrap(); - - let mut expected_closed = false; - let mut network_1_established = false; - let mut network_2_established = false; - let mut network_1_limit_reached = false; - let mut network_2_limit_reached = false; - poll_fn(move |cx| { - loop { - let mut network_1_pending = false; - let mut network_2_pending = false; - - match network1.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {} - Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { - network_1_established = true; - } - Poll::Ready(Some(SwarmEvent::IncomingConnectionError { - error: PendingConnectionError::ConnectionLimit(err), - .. - })) => { - assert_eq!(err.limit, limit); - assert_eq!(err.limit, err.current); - let info = network1.network_info(); - let counters = info.connection_counters(); - assert_eq!(counters.num_established_incoming(), limit); - assert_eq!(counters.num_established(), limit); - network_1_limit_reached = true; - } - Poll::Pending => { - network_1_pending = true; - } - e => panic!("Unexpected network event: {:?}", e), - } - - match network2.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { - network_2_established = true; - } - Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => { - assert!(expected_closed); - let info = network2.network_info(); - let counters = info.connection_counters(); - assert_eq!(counters.num_established_outgoing(), limit); - assert_eq!(counters.num_established(), limit); - network_2_limit_reached = true; - } - Poll::Pending => { - network_2_pending = true; - } - e => panic!("Unexpected network event: {:?}", e), - } - - if network_1_pending && network_2_pending { - return Poll::Pending; - } - - if network_1_established && network_2_established { - network_1_established = false; - network_2_established = false; - - if n <= limit { - // Dial again until the limit is exceeded. - n += 1; - network2.dial(listen_addr.clone()).unwrap(); - - if n == limit { - // The the next dialing attempt exceeds the limit, this - // is the connection we expected to get closed. - expected_closed = true; - } - } else { - panic!("Expect networks not to establish connections beyond the limit.") - } - } - - if network_1_limit_reached && network_2_limit_reached { - return Poll::Ready(()); - } - } - }) - }); - } - - quickcheck(prop as fn(_)); - } - #[test] fn invalid_peer_id() { // Checks whether dialing an address containing the wrong peer id raises an error