From 74197a5f08e6e07a975541d38efa123bc114928b Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 4 Jun 2025 16:45:11 +0200 Subject: [PATCH 1/3] introduce connection tracker --- Cargo.lock | 9 ++ Cargo.toml | 2 +- misc/connection-tracker/Cargo.toml | 13 ++ misc/connection-tracker/src/behavior.rs | 194 ++++++++++++++++++++++++ misc/connection-tracker/src/lib.rs | 44 ++++++ misc/connection-tracker/src/store.rs | 70 +++++++++ 6 files changed, 331 insertions(+), 1 deletion(-) create mode 100644 misc/connection-tracker/Cargo.toml create mode 100644 misc/connection-tracker/src/behavior.rs create mode 100644 misc/connection-tracker/src/lib.rs create mode 100644 misc/connection-tracker/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index fc074f97534..5d224a4adf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2755,6 +2755,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "libp2p-connection-tracker" +version = "0.1.0" +dependencies = [ + "libp2p-core", + "libp2p-swarm", + "tracing", +] + [[package]] name = "libp2p-core" version = "0.43.1" diff --git a/Cargo.toml b/Cargo.toml index 631a8087599..65ebe5d1360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "wasm-tests/webtransport-tests", + "wasm-tests/webtransport-tests", "misc/connection-tracker", ] resolver = "2" diff --git a/misc/connection-tracker/Cargo.toml b/misc/connection-tracker/Cargo.toml new file mode 100644 index 00000000000..83976fd53a2 --- /dev/null +++ b/misc/connection-tracker/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "libp2p-connection-tracker" +version = "0.1.0" +edition.workspace = true +description = "A libp2p NetworkBehaviour for tracking peer connection state, disconnections, and bans" +license = "MIT" +publish = false +rust-version.workspace = true + +[dependencies] +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } +tracing = { workspace = true } \ No newline at end of file diff --git a/misc/connection-tracker/src/behavior.rs b/misc/connection-tracker/src/behavior.rs new file mode 100644 index 00000000000..b9f92641371 --- /dev/null +++ b/misc/connection-tracker/src/behavior.rs @@ -0,0 +1,194 @@ +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + +use libp2p_core::transport::PortUse; +use libp2p_core::PeerId; +use libp2p_swarm::{ + behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}, + ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, + ToSwarm, +}; +use tracing::{debug, trace}; + +use crate::{store::ConnectionStore, Event}; + +/// A [`NetworkBehaviour`] that tracks connected peers. +/// +/// This behaviour provides simple connection tracking without any advanced +/// features like banning or scoring (for now). It's designed to be lightweight and +/// composable with other behaviours. +/// +/// # Example +/// +/// ```rust +/// use libp2p_connection_tracker::Behaviour; +/// use libp2p_swarm_derive::NetworkBehaviour; +/// +/// #[derive(NetworkBehaviour)] +/// #[behaviour(prelude = "libp2p_swarm::derive_prelude")] +/// struct MyBehaviour { +/// connection_tracker: Behaviour, +/// // ... other behaviours +/// } +/// +/// let connection_tracker = Behaviour::new(); +/// ``` +pub struct Behaviour { + /// Storage for connection state. + store: ConnectionStore, + + /// Queue of events to emit. + pending_events: VecDeque, +} + +impl Behaviour { + /// Create a new connection tracker behaviour. + pub fn new() -> Self { + Self { + store: ConnectionStore::new(), + pending_events: VecDeque::new(), + } + } + + /// Check if a peer is currently connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.store.is_connected(peer_id) + } + + /// Get all currently connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.store.connected_peers() + } + + /// Get the number of currently connected peers. + pub fn connected_count(&self) -> usize { + self.store.connected_count() + } + + /// Get the number of connections to a specific peer. + pub fn connection_count(&self, peer_id: &PeerId) -> usize { + self.store.connection_count(peer_id) + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = libp2p_swarm::dummy::ConnectionHandler; + type ToSwarm = Event; + + fn handle_pending_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result<(), ConnectionDenied> { + // We don't interfere with connection establishment + Ok(()) + } + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(libp2p_swarm::dummy::ConnectionHandler) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _maybe_peer: Option, + _addresses: &[libp2p_core::Multiaddr], + _effective_role: libp2p_core::Endpoint, + ) -> Result, ConnectionDenied> { + // Don't modify addresses + Ok(vec![]) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + _port_use: PortUse, + ) -> Result, ConnectionDenied> { + Ok(libp2p_swarm::dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + }) => { + trace!(%peer_id, ?connection_id, "Connection established"); + + let is_first_connection = self.store.connection_established(peer_id, connection_id); + + if is_first_connection { + debug!(?peer_id, "Peer connected"); + self.pending_events.push_back(Event::PeerConnected { + peer_id, + connection_id, + endpoint: endpoint.clone(), + }); + } + } + + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + remaining_established, + .. + }) => { + trace!(%peer_id, ?connection_id, remaining_established, "Connection closed"); + + let is_last_connection = + self.store + .connection_closed(peer_id, connection_id, remaining_established); + + if is_last_connection { + debug!(%peer_id, "Peer disconnected"); + self.pending_events.push_back(Event::PeerDisconnected { + peer_id, + connection_id, + }); + } + } + + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + match event {} + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Behaviour { + fn default() -> Self { + Self::new() + } +} diff --git a/misc/connection-tracker/src/lib.rs b/misc/connection-tracker/src/lib.rs new file mode 100644 index 00000000000..81ffcbedc6e --- /dev/null +++ b/misc/connection-tracker/src/lib.rs @@ -0,0 +1,44 @@ +//! A [`NetworkBehaviour`] for tracking connected peers. +//! +//! This crate provides simple connected peer tracking that can be composed +//! with other behaviours to avoid manual connection state management. +//! +//! # Usage +//! +//! ```rust +//! use libp2p_swarm::{dummy, NetworkBehaviour};//! +//! +//! use libp2p_swarm::dummy::Behaviour; +//! +//! #[derive(NetworkBehaviour)] +//! #[behaviour(prelude = "libp2p_swarm::derive_prelude")] +//! struct MyBehaviour { +//! connection_tracker: Behaviour, +//! // ... other behaviours +//! } +//! ``` + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod behavior; +mod store; + +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::ConnectionId; + +/// Events emitted by the connection tracker behaviour. +#[derive(Debug, Clone)] +pub enum Event { + /// A peer connected (first connection established). + PeerConnected { + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + }, + + /// A peer disconnected (last connection closed). + PeerDisconnected { + peer_id: PeerId, + connection_id: ConnectionId, + }, +} diff --git a/misc/connection-tracker/src/store.rs b/misc/connection-tracker/src/store.rs new file mode 100644 index 00000000000..2cbe6fb8909 --- /dev/null +++ b/misc/connection-tracker/src/store.rs @@ -0,0 +1,70 @@ +use libp2p_core::PeerId; +use libp2p_swarm::ConnectionId; +use std::collections::{HashMap, HashSet}; + +/// Simple storage for connected peers. +#[derive(Debug, Default)] +pub struct ConnectionStore { + /// Currently connected peers with their connection IDs + connected: HashMap>, +} + +impl ConnectionStore { + /// Create a new connection store. + pub fn new() -> Self { + Self { + connected: HashMap::new(), + } + } + + /// Add a new connection for a peer. + /// Returns `true` if this is the first connection to the peer. + pub fn connection_established(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool { + let connections = self.connected.entry(peer_id).or_insert_with(HashSet::new); + let is_first_connection = connections.is_empty(); + connections.insert(connection_id); + is_first_connection + } + + /// Remove a connection for a peer. + /// Returns `true` if this was the last connection to the peer. + pub fn connection_closed( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + remaining_established: usize, + ) -> bool { + if let Some(connections) = self.connected.get_mut(&peer_id) { + connections.remove(&connection_id); + + if remaining_established == 0 { + self.connected.remove(&peer_id); + return true; + } + } + false + } + + /// Check if a peer is currently connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.connected.contains_key(peer_id) + } + + /// Get all connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.connected.keys() + } + + /// Get the number of connected peers. + pub fn connected_count(&self) -> usize { + self.connected.len() + } + + /// Get the number of connections to a specific peer. + pub fn connection_count(&self, peer_id: &PeerId) -> usize { + self.connected + .get(peer_id) + .map(|connections| connections.len()) + .unwrap_or(0) + } +} From e3e74c0bb195fcc915ea4fe4273be33bb1bb77df Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 5 Jun 2025 15:17:35 +0200 Subject: [PATCH 2/3] make connection_store a PeerStore component --- Cargo.lock | 10 +- Cargo.toml | 2 +- misc/connection-tracker/Cargo.toml | 13 -- misc/connection-tracker/src/behavior.rs | 194 ------------------ misc/connection-tracker/src/lib.rs | 44 ---- misc/peer-store/Cargo.toml | 1 + .../src/connection_store.rs} | 41 +++- misc/peer-store/src/lib.rs | 1 + misc/peer-store/src/memory_store.rs | 80 +++++++- 9 files changed, 105 insertions(+), 281 deletions(-) delete mode 100644 misc/connection-tracker/Cargo.toml delete mode 100644 misc/connection-tracker/src/behavior.rs delete mode 100644 misc/connection-tracker/src/lib.rs rename misc/{connection-tracker/src/store.rs => peer-store/src/connection_store.rs} (68%) diff --git a/Cargo.lock b/Cargo.lock index 537f1c7f080..a16cc80a469 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2755,15 +2755,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "libp2p-connection-tracker" -version = "0.1.0" -dependencies = [ - "libp2p-core", - "libp2p-swarm", - "tracing", -] - [[package]] name = "libp2p-core" version = "0.43.1" @@ -3097,6 +3088,7 @@ dependencies = [ "lru", "serde_json", "tokio", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e53555c887d..5f6059fcbb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ members = [ "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "wasm-tests/webtransport-tests", "misc/connection-tracker", + "wasm-tests/webtransport-tests", ] resolver = "2" diff --git a/misc/connection-tracker/Cargo.toml b/misc/connection-tracker/Cargo.toml deleted file mode 100644 index 83976fd53a2..00000000000 --- a/misc/connection-tracker/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "libp2p-connection-tracker" -version = "0.1.0" -edition.workspace = true -description = "A libp2p NetworkBehaviour for tracking peer connection state, disconnections, and bans" -license = "MIT" -publish = false -rust-version.workspace = true - -[dependencies] -libp2p-core = { workspace = true } -libp2p-swarm = { workspace = true } -tracing = { workspace = true } \ No newline at end of file diff --git a/misc/connection-tracker/src/behavior.rs b/misc/connection-tracker/src/behavior.rs deleted file mode 100644 index b9f92641371..00000000000 --- a/misc/connection-tracker/src/behavior.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::{ - collections::VecDeque, - task::{Context, Poll}, -}; - -use libp2p_core::transport::PortUse; -use libp2p_core::PeerId; -use libp2p_swarm::{ - behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}, - ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, - ToSwarm, -}; -use tracing::{debug, trace}; - -use crate::{store::ConnectionStore, Event}; - -/// A [`NetworkBehaviour`] that tracks connected peers. -/// -/// This behaviour provides simple connection tracking without any advanced -/// features like banning or scoring (for now). It's designed to be lightweight and -/// composable with other behaviours. -/// -/// # Example -/// -/// ```rust -/// use libp2p_connection_tracker::Behaviour; -/// use libp2p_swarm_derive::NetworkBehaviour; -/// -/// #[derive(NetworkBehaviour)] -/// #[behaviour(prelude = "libp2p_swarm::derive_prelude")] -/// struct MyBehaviour { -/// connection_tracker: Behaviour, -/// // ... other behaviours -/// } -/// -/// let connection_tracker = Behaviour::new(); -/// ``` -pub struct Behaviour { - /// Storage for connection state. - store: ConnectionStore, - - /// Queue of events to emit. - pending_events: VecDeque, -} - -impl Behaviour { - /// Create a new connection tracker behaviour. - pub fn new() -> Self { - Self { - store: ConnectionStore::new(), - pending_events: VecDeque::new(), - } - } - - /// Check if a peer is currently connected. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - self.store.is_connected(peer_id) - } - - /// Get all currently connected peer IDs. - pub fn connected_peers(&self) -> impl Iterator { - self.store.connected_peers() - } - - /// Get the number of currently connected peers. - pub fn connected_count(&self) -> usize { - self.store.connected_count() - } - - /// Get the number of connections to a specific peer. - pub fn connection_count(&self, peer_id: &PeerId) -> usize { - self.store.connection_count(peer_id) - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = libp2p_swarm::dummy::ConnectionHandler; - type ToSwarm = Event; - - fn handle_pending_inbound_connection( - &mut self, - _connection_id: ConnectionId, - _local_addr: &libp2p_core::Multiaddr, - _remote_addr: &libp2p_core::Multiaddr, - ) -> Result<(), ConnectionDenied> { - // We don't interfere with connection establishment - Ok(()) - } - - fn handle_established_inbound_connection( - &mut self, - _connection_id: ConnectionId, - _peer: PeerId, - _local_addr: &libp2p_core::Multiaddr, - _remote_addr: &libp2p_core::Multiaddr, - ) -> Result, ConnectionDenied> { - Ok(libp2p_swarm::dummy::ConnectionHandler) - } - - fn handle_pending_outbound_connection( - &mut self, - _connection_id: ConnectionId, - _maybe_peer: Option, - _addresses: &[libp2p_core::Multiaddr], - _effective_role: libp2p_core::Endpoint, - ) -> Result, ConnectionDenied> { - // Don't modify addresses - Ok(vec![]) - } - - fn handle_established_outbound_connection( - &mut self, - _connection_id: ConnectionId, - _peer: PeerId, - _addr: &libp2p_core::Multiaddr, - _role_override: libp2p_core::Endpoint, - _port_use: PortUse, - ) -> Result, ConnectionDenied> { - Ok(libp2p_swarm::dummy::ConnectionHandler) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint, - .. - }) => { - trace!(%peer_id, ?connection_id, "Connection established"); - - let is_first_connection = self.store.connection_established(peer_id, connection_id); - - if is_first_connection { - debug!(?peer_id, "Peer connected"); - self.pending_events.push_back(Event::PeerConnected { - peer_id, - connection_id, - endpoint: endpoint.clone(), - }); - } - } - - FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - remaining_established, - .. - }) => { - trace!(%peer_id, ?connection_id, remaining_established, "Connection closed"); - - let is_last_connection = - self.store - .connection_closed(peer_id, connection_id, remaining_established); - - if is_last_connection { - debug!(%peer_id, "Peer disconnected"); - self.pending_events.push_back(Event::PeerDisconnected { - peer_id, - connection_id, - }); - } - } - - _ => {} - } - } - - fn on_connection_handler_event( - &mut self, - _peer_id: PeerId, - _connection_id: ConnectionId, - event: THandlerOutEvent, - ) { - match event {} - } - - fn poll( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll>> { - if let Some(event) = self.pending_events.pop_front() { - return Poll::Ready(ToSwarm::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - Self::new() - } -} diff --git a/misc/connection-tracker/src/lib.rs b/misc/connection-tracker/src/lib.rs deleted file mode 100644 index 81ffcbedc6e..00000000000 --- a/misc/connection-tracker/src/lib.rs +++ /dev/null @@ -1,44 +0,0 @@ -//! A [`NetworkBehaviour`] for tracking connected peers. -//! -//! This crate provides simple connected peer tracking that can be composed -//! with other behaviours to avoid manual connection state management. -//! -//! # Usage -//! -//! ```rust -//! use libp2p_swarm::{dummy, NetworkBehaviour};//! -//! -//! use libp2p_swarm::dummy::Behaviour; -//! -//! #[derive(NetworkBehaviour)] -//! #[behaviour(prelude = "libp2p_swarm::derive_prelude")] -//! struct MyBehaviour { -//! connection_tracker: Behaviour, -//! // ... other behaviours -//! } -//! ``` - -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] - -mod behavior; -mod store; - -use libp2p_core::{ConnectedPoint, PeerId}; -use libp2p_swarm::ConnectionId; - -/// Events emitted by the connection tracker behaviour. -#[derive(Debug, Clone)] -pub enum Event { - /// A peer connected (first connection established). - PeerConnected { - peer_id: PeerId, - connection_id: ConnectionId, - endpoint: ConnectedPoint, - }, - - /// A peer disconnected (last connection closed). - PeerDisconnected { - peer_id: PeerId, - connection_id: ConnectionId, - }, -} diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml index 5f6bf598cbf..a3936d1a9ae 100644 --- a/misc/peer-store/Cargo.toml +++ b/misc/peer-store/Cargo.toml @@ -13,6 +13,7 @@ libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } lru = "0.12.3" libp2p-identity = { workspace = true, optional = true } +tracing = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/misc/connection-tracker/src/store.rs b/misc/peer-store/src/connection_store.rs similarity index 68% rename from misc/connection-tracker/src/store.rs rename to misc/peer-store/src/connection_store.rs index 2cbe6fb8909..0ac3a38a893 100644 --- a/misc/connection-tracker/src/store.rs +++ b/misc/peer-store/src/connection_store.rs @@ -1,6 +1,23 @@ -use libp2p_core::PeerId; -use libp2p_swarm::ConnectionId; use std::collections::{HashMap, HashSet}; +use libp2p_core::{ConnectedPoint, PeerId}; +use libp2p_swarm::ConnectionId; + +/// Events emitted by the connection tracker behaviour. +#[derive(Debug, Clone)] +pub enum Event { + /// A peer connected (first connection established). + PeerConnected { + peer_id: PeerId, + connection_id: ConnectionId, + endpoint: ConnectedPoint, + }, + + /// A peer disconnected (last connection closed). + PeerDisconnected { + peer_id: PeerId, + connection_id: ConnectionId, + }, +} /// Simple storage for connected peers. #[derive(Debug, Default)] @@ -19,10 +36,14 @@ impl ConnectionStore { /// Add a new connection for a peer. /// Returns `true` if this is the first connection to the peer. - pub fn connection_established(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool { - let connections = self.connected.entry(peer_id).or_insert_with(HashSet::new); + pub fn connection_established( + &mut self, + peer_id: &PeerId, + connection_id: &ConnectionId, + ) -> bool { + let connections = self.connected.entry(*peer_id).or_insert_with(HashSet::new); let is_first_connection = connections.is_empty(); - connections.insert(connection_id); + connections.insert(*connection_id); is_first_connection } @@ -30,14 +51,14 @@ impl ConnectionStore { /// Returns `true` if this was the last connection to the peer. pub fn connection_closed( &mut self, - peer_id: PeerId, - connection_id: ConnectionId, - remaining_established: usize, + peer_id: &PeerId, + connection_id: &ConnectionId, + remaining_established: &usize, ) -> bool { if let Some(connections) = self.connected.get_mut(&peer_id) { connections.remove(&connection_id); - if remaining_established == 0 { + if *remaining_established == 0 { self.connected.remove(&peer_id); return true; } @@ -67,4 +88,4 @@ impl ConnectionStore { .map(|connections| connections.len()) .unwrap_or(0) } -} +} \ No newline at end of file diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index 31f7eb6497c..b5a12f69821 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -20,6 +20,7 @@ mod behaviour; pub mod memory_store; mod store; +pub mod connection_store; pub use behaviour::Behaviour; pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 8fbe31c6292..ea9e794f853 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -13,12 +13,13 @@ use std::{ num::NonZeroUsize, task::{Poll, Waker}, }; - +use std::collections::HashSet; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm}; +use libp2p_swarm::{behaviour::ConnectionEstablished, ConnectionClosed, ConnectionId, DialError, FromSwarm}; use lru::LruCache; - -use super::Store; +use tracing::{debug, trace}; +use crate::connection_store::ConnectionStore; +use super::{connection_store, Store}; /// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm). #[derive(Debug, Clone)] @@ -59,6 +60,8 @@ pub struct MemoryStore { config: Config, /// Waker for store events. waker: Option, + /// Connection store to track connected peers. + connection_store: ConnectionStore, } impl MemoryStore { @@ -69,6 +72,7 @@ impl MemoryStore { records: HashMap::new(), pending_events: VecDeque::default(), waker: None, + connection_store: ConnectionStore::new(), } } @@ -187,6 +191,26 @@ impl MemoryStore { waker.wake(); // wake up because of update } } + + /// Check if a peer is currently connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + self.connection_store.is_connected(peer_id) + } + + /// Get all currently connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.connection_store.connected_peers() + } + + /// Get the number of currently connected peers. + pub fn connected_count(&self) -> usize { + self.connection_store.connected_count() + } + + /// Get the number of connections to a specific peer. + pub fn connection_count(&self, peer_id: &PeerId) -> usize { + self.connection_store.connection_count(peer_id) + } } impl Store for MemoryStore { @@ -198,17 +222,53 @@ impl Store for MemoryStore { self.add_address_inner(&info.peer_id, info.addr, false); } FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, + peer_id, + connection_id, failed_addresses, endpoint, .. - }) if endpoint.is_dialer() => { - if self.config.remove_addr_on_dial_error { - for failed_addr in *failed_addresses { - self.remove_address_inner(peer_id, failed_addr, false); + }) => { + if endpoint.is_dialer() { + if self.config.remove_addr_on_dial_error { + for failed_addr in *failed_addresses { + self.remove_address_inner(peer_id, failed_addr, false); + } } + self.add_address_inner(peer_id, endpoint.get_remote_address(), false); + } + + trace!(%peer_id, ?connection_id, "Connection established"); + + let is_first_connection = self.connection_store.connection_established(peer_id, connection_id); + + if is_first_connection { + debug!(?peer_id, "Peer connected"); + // self.pending_events.push_back(connection_store::Event::PeerConnected { + // peer_id: *peer_id, + // connection_id: *connection_id, + // endpoint: endpoint.clone(), + // }); + } + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + remaining_established, + .. + }) => { + trace!(%peer_id, ?connection_id, remaining_established, "Connection closed"); + + let is_last_connection = + self.connection_store + .connection_closed(peer_id, connection_id, remaining_established); + + if is_last_connection { + debug!(%peer_id, "Peer disconnected"); + // self.pending_events.push_back(connection_store::Event::PeerDisconnected { + // peer_id: *peer_id, + // connection_id: *connection_id, + // }); } - self.add_address_inner(peer_id, endpoint.get_remote_address(), false); } FromSwarm::DialFailure(info) => { if !self.config.remove_addr_on_dial_error { From e6048c8be44c0c5a3d7c791afce4d0bd01837ce5 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 5 Jun 2025 15:23:45 +0200 Subject: [PATCH 3/3] fmt and clippy --- misc/peer-store/src/connection_store.rs | 12 ++++---- misc/peer-store/src/lib.rs | 2 +- misc/peer-store/src/memory_store.rs | 37 +++++++++++++------------ 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/misc/peer-store/src/connection_store.rs b/misc/peer-store/src/connection_store.rs index 0ac3a38a893..d2699f7f8f4 100644 --- a/misc/peer-store/src/connection_store.rs +++ b/misc/peer-store/src/connection_store.rs @@ -1,6 +1,6 @@ -use std::collections::{HashMap, HashSet}; use libp2p_core::{ConnectedPoint, PeerId}; use libp2p_swarm::ConnectionId; +use std::collections::{HashMap, HashSet}; /// Events emitted by the connection tracker behaviour. #[derive(Debug, Clone)] @@ -41,7 +41,7 @@ impl ConnectionStore { peer_id: &PeerId, connection_id: &ConnectionId, ) -> bool { - let connections = self.connected.entry(*peer_id).or_insert_with(HashSet::new); + let connections = self.connected.entry(*peer_id).or_default(); let is_first_connection = connections.is_empty(); connections.insert(*connection_id); is_first_connection @@ -55,11 +55,11 @@ impl ConnectionStore { connection_id: &ConnectionId, remaining_established: &usize, ) -> bool { - if let Some(connections) = self.connected.get_mut(&peer_id) { - connections.remove(&connection_id); + if let Some(connections) = self.connected.get_mut(peer_id) { + connections.remove(connection_id); if *remaining_established == 0 { - self.connected.remove(&peer_id); + self.connected.remove(peer_id); return true; } } @@ -88,4 +88,4 @@ impl ConnectionStore { .map(|connections| connections.len()) .unwrap_or(0) } -} \ No newline at end of file +} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index b5a12f69821..2b3ae3c2837 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -18,9 +18,9 @@ //! or provide information to PeerStore. mod behaviour; +pub mod connection_store; pub mod memory_store; mod store; -pub mod connection_store; pub use behaviour::Behaviour; pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index ea9e794f853..d2f260fbc50 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -8,18 +8,17 @@ //! let behaviour = Behaviour::new(store); //! ``` +use super::Store; +use crate::connection_store::ConnectionStore; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::{behaviour::ConnectionEstablished, ConnectionClosed, DialError, FromSwarm}; +use lru::LruCache; use std::{ collections::{HashMap, VecDeque}, num::NonZeroUsize, task::{Poll, Waker}, }; -use std::collections::HashSet; -use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::{behaviour::ConnectionEstablished, ConnectionClosed, ConnectionId, DialError, FromSwarm}; -use lru::LruCache; use tracing::{debug, trace}; -use crate::connection_store::ConnectionStore; -use super::{connection_store, Store}; /// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm). #[derive(Debug, Clone)] @@ -222,12 +221,12 @@ impl Store for MemoryStore { self.add_address_inner(&info.peer_id, info.addr, false); } FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, + peer_id, connection_id, failed_addresses, endpoint, .. - }) => { + }) => { if endpoint.is_dialer() { if self.config.remove_addr_on_dial_error { for failed_addr in *failed_addresses { @@ -239,7 +238,9 @@ impl Store for MemoryStore { trace!(%peer_id, ?connection_id, "Connection established"); - let is_first_connection = self.connection_store.connection_established(peer_id, connection_id); + let is_first_connection = self + .connection_store + .connection_established(peer_id, connection_id); if is_first_connection { debug!(?peer_id, "Peer connected"); @@ -251,16 +252,18 @@ impl Store for MemoryStore { } } FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - remaining_established, - .. - }) => { + peer_id, + connection_id, + remaining_established, + .. + }) => { trace!(%peer_id, ?connection_id, remaining_established, "Connection closed"); - let is_last_connection = - self.connection_store - .connection_closed(peer_id, connection_id, remaining_established); + let is_last_connection = self.connection_store.connection_closed( + peer_id, + connection_id, + remaining_established, + ); if is_last_connection { debug!(%peer_id, "Peer disconnected");