Skip to content

Commit 2725dca

Browse files
committed
introduce connection tracker
1 parent bee820e commit 2725dca

File tree

5 files changed

+318
-1
lines changed

5 files changed

+318
-1
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ members = [
6565
"transports/websocket-websys",
6666
"transports/websocket",
6767
"transports/webtransport-websys",
68-
"wasm-tests/webtransport-tests",
68+
"wasm-tests/webtransport-tests", "misc/connection-tracker",
6969
]
7070
resolver = "2"
7171

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
use std::{
2+
collections::VecDeque,
3+
task::{Context, Poll},
4+
};
5+
6+
use libp2p_core::transport::PortUse;
7+
use libp2p_core::PeerId;
8+
use libp2p_swarm::{
9+
behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
10+
ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent,
11+
ToSwarm,
12+
};
13+
use tracing::{debug, trace};
14+
15+
use crate::{store::ConnectionStore, Event};
16+
17+
/// A [`NetworkBehaviour`] that tracks connected peers.
18+
///
19+
/// This behaviour provides simple connection tracking without any advanced
20+
/// features like banning or scoring. It's designed to be lightweight and
21+
/// composable with other behaviours.
22+
///
23+
/// # Example
24+
///
25+
/// ```rust
26+
/// use libp2p_connection_tracker::Behaviour;
27+
/// use libp2p_swarm_derive::NetworkBehaviour;
28+
///
29+
/// #[derive(NetworkBehaviour)]
30+
/// #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
31+
/// struct MyBehaviour {
32+
/// connection_tracker: Behaviour,
33+
/// // ... other behaviours
34+
/// }
35+
///
36+
/// let connection_tracker = Behaviour::new();
37+
/// ```
38+
pub struct Behaviour {
39+
/// Storage for connection state.
40+
store: ConnectionStore,
41+
42+
/// Queue of events to emit.
43+
pending_events: VecDeque<Event>,
44+
}
45+
46+
impl Behaviour {
47+
/// Create a new connection tracker behaviour.
48+
pub fn new() -> Self {
49+
Self {
50+
store: ConnectionStore::new(),
51+
pending_events: VecDeque::new(),
52+
}
53+
}
54+
55+
/// Check if a peer is currently connected.
56+
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
57+
self.store.is_connected(peer_id)
58+
}
59+
60+
/// Get all currently connected peer IDs.
61+
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
62+
self.store.connected_peers()
63+
}
64+
65+
/// Get the number of currently connected peers.
66+
pub fn connected_count(&self) -> usize {
67+
self.store.connected_count()
68+
}
69+
70+
/// Get the number of connections to a specific peer.
71+
pub fn connection_count(&self, peer_id: &PeerId) -> usize {
72+
self.store.connection_count(peer_id)
73+
}
74+
}
75+
76+
impl NetworkBehaviour for Behaviour {
77+
type ConnectionHandler = libp2p_swarm::dummy::ConnectionHandler;
78+
type ToSwarm = Event;
79+
80+
fn handle_pending_inbound_connection(
81+
&mut self,
82+
_connection_id: ConnectionId,
83+
_local_addr: &libp2p_core::Multiaddr,
84+
_remote_addr: &libp2p_core::Multiaddr,
85+
) -> Result<(), ConnectionDenied> {
86+
// We don't interfere with connection establishment
87+
Ok(())
88+
}
89+
90+
fn handle_established_inbound_connection(
91+
&mut self,
92+
_connection_id: ConnectionId,
93+
_peer: PeerId,
94+
_local_addr: &libp2p_core::Multiaddr,
95+
_remote_addr: &libp2p_core::Multiaddr,
96+
) -> Result<THandler<Self>, ConnectionDenied> {
97+
Ok(libp2p_swarm::dummy::ConnectionHandler)
98+
}
99+
100+
fn handle_pending_outbound_connection(
101+
&mut self,
102+
_connection_id: ConnectionId,
103+
_maybe_peer: Option<PeerId>,
104+
_addresses: &[libp2p_core::Multiaddr],
105+
_effective_role: libp2p_core::Endpoint,
106+
) -> Result<Vec<libp2p_core::Multiaddr>, ConnectionDenied> {
107+
// Don't modify addresses
108+
Ok(vec![])
109+
}
110+
111+
fn handle_established_outbound_connection(
112+
&mut self,
113+
_connection_id: ConnectionId,
114+
_peer: PeerId,
115+
_addr: &libp2p_core::Multiaddr,
116+
_role_override: libp2p_core::Endpoint,
117+
_port_use: PortUse,
118+
) -> Result<THandler<Self>, ConnectionDenied> {
119+
Ok(libp2p_swarm::dummy::ConnectionHandler)
120+
}
121+
122+
fn on_swarm_event(&mut self, event: FromSwarm) {
123+
match event {
124+
FromSwarm::ConnectionEstablished(ConnectionEstablished {
125+
peer_id,
126+
connection_id,
127+
endpoint,
128+
..
129+
}) => {
130+
trace!(%peer_id, ?connection_id, "Connection established");
131+
132+
let is_first_connection = self.store.connection_established(peer_id, connection_id);
133+
134+
if is_first_connection {
135+
debug!(?peer_id, "Peer connected");
136+
self.pending_events.push_back(Event::PeerConnected {
137+
peer_id,
138+
connection_id,
139+
endpoint: endpoint.clone(),
140+
});
141+
}
142+
}
143+
144+
FromSwarm::ConnectionClosed(ConnectionClosed {
145+
peer_id,
146+
connection_id,
147+
remaining_established,
148+
..
149+
}) => {
150+
trace!(%peer_id, ?connection_id, remaining_established, "Connection closed");
151+
152+
let is_last_connection =
153+
self.store
154+
.connection_closed(peer_id, connection_id, remaining_established);
155+
156+
if is_last_connection {
157+
debug!(%peer_id, "Peer disconnected");
158+
self.pending_events.push_back(Event::PeerDisconnected {
159+
peer_id,
160+
connection_id,
161+
});
162+
}
163+
}
164+
165+
_ => {}
166+
}
167+
}
168+
169+
fn on_connection_handler_event(
170+
&mut self,
171+
_peer_id: PeerId,
172+
_connection_id: ConnectionId,
173+
event: THandlerOutEvent<Self>,
174+
) {
175+
match event {}
176+
}
177+
178+
fn poll(
179+
&mut self,
180+
_cx: &mut Context<'_>,
181+
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
182+
if let Some(event) = self.pending_events.pop_front() {
183+
return Poll::Ready(ToSwarm::GenerateEvent(event));
184+
}
185+
186+
Poll::Pending
187+
}
188+
}
189+
190+
impl Default for Behaviour {
191+
fn default() -> Self {
192+
Self::new()
193+
}
194+
}

misc/connection-tracker/src/lib.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//! A [`NetworkBehaviour`] for tracking connected peers.
2+
//!
3+
//! This crate provides simple connected peer tracking that can be composed
4+
//! with other behaviours to avoid manual connection state management.
5+
//!
6+
//! # Usage
7+
//!
8+
//! ```rust
9+
//! use libp2p_swarm::{dummy, NetworkBehaviour};//!
10+
//!
11+
//! use libp2p_swarm::dummy::Behaviour;
12+
//!
13+
//! #[derive(NetworkBehaviour)]
14+
//! #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
15+
//! struct MyBehaviour {
16+
//! connection_tracker: Behaviour,
17+
//! // ... other behaviours
18+
//! }
19+
//! ```
20+
21+
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
22+
23+
mod behavior;
24+
mod store;
25+
26+
use libp2p_core::{ConnectedPoint, PeerId};
27+
use libp2p_swarm::ConnectionId;
28+
29+
/// Events emitted by the connection tracker behaviour.
30+
#[derive(Debug, Clone)]
31+
pub enum Event {
32+
/// A peer connected (first connection established).
33+
PeerConnected {
34+
peer_id: PeerId,
35+
connection_id: ConnectionId,
36+
endpoint: ConnectedPoint,
37+
},
38+
39+
/// A peer disconnected (last connection closed).
40+
PeerDisconnected {
41+
peer_id: PeerId,
42+
connection_id: ConnectionId,
43+
},
44+
}

misc/connection-tracker/src/store.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use libp2p_core::PeerId;
2+
use libp2p_swarm::ConnectionId;
3+
use std::collections::{HashMap, HashSet};
4+
5+
/// Simple storage for connected peers.
6+
#[derive(Debug, Default)]
7+
pub struct ConnectionStore {
8+
/// Currently connected peers with their connection IDs
9+
connected: HashMap<PeerId, HashSet<ConnectionId>>,
10+
}
11+
12+
impl ConnectionStore {
13+
/// Create a new connection store.
14+
pub fn new() -> Self {
15+
Self {
16+
connected: HashMap::new(),
17+
}
18+
}
19+
20+
/// Add a new connection for a peer.
21+
/// Returns `true` if this is the first connection to the peer.
22+
pub fn connection_established(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool {
23+
let connections = self.connected.entry(peer_id).or_insert_with(HashSet::new);
24+
let is_first_connection = connections.is_empty();
25+
connections.insert(connection_id);
26+
is_first_connection
27+
}
28+
29+
/// Remove a connection for a peer.
30+
/// Returns `true` if this was the last connection to the peer.
31+
pub fn connection_closed(
32+
&mut self,
33+
peer_id: PeerId,
34+
connection_id: ConnectionId,
35+
remaining_established: usize,
36+
) -> bool {
37+
if let Some(connections) = self.connected.get_mut(&peer_id) {
38+
connections.remove(&connection_id);
39+
40+
if remaining_established == 0 {
41+
self.connected.remove(&peer_id);
42+
return true;
43+
}
44+
}
45+
false
46+
}
47+
48+
/// Check if a peer is currently connected.
49+
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
50+
self.connected.contains_key(peer_id)
51+
}
52+
53+
/// Get all connected peer IDs.
54+
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
55+
self.connected.keys()
56+
}
57+
58+
/// Get the number of connected peers.
59+
pub fn connected_count(&self) -> usize {
60+
self.connected.len()
61+
}
62+
63+
/// Get the number of connections to a specific peer.
64+
pub fn connection_count(&self, peer_id: &PeerId) -> usize {
65+
self.connected
66+
.get(peer_id)
67+
.map(|connections| connections.len())
68+
.unwrap_or(0)
69+
}
70+
}

0 commit comments

Comments
 (0)