Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions misc/peer-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
lru = "0.12.3"
libp2p-identity = { workspace = true, optional = true }
tracing = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
91 changes: 91 additions & 0 deletions misc/peer-store/src/connection_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use libp2p_core::{ConnectedPoint, PeerId};
use libp2p_swarm::ConnectionId;
use std::collections::{HashMap, HashSet};

/// Events emitted by the connection tracker behaviour.
#[derive(Debug, Clone)]
pub enum Event {
/// A peer connected (first connection established).
PeerConnected {
peer_id: PeerId,
connection_id: ConnectionId,
endpoint: ConnectedPoint,
},

/// A peer disconnected (last connection closed).
PeerDisconnected {
peer_id: PeerId,
connection_id: ConnectionId,
},
Comment on lines +9 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these events. We already have the swarm-events for established and closed connections, that include the info how many other connections to that peer exist.

}

/// Simple storage for connected peers.
#[derive(Debug, Default)]
pub struct ConnectionStore {
/// Currently connected peers with their connection IDs
connected: HashMap<PeerId, HashSet<ConnectionId>>,
}
Comment on lines +22 to +27
Copy link
Member

@elenaf9 elenaf9 Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a separate module, or even own structure, given that all it does is just adding and removing peers from a hashmap.
Why not just embed the logic directly in Behavior?


impl ConnectionStore {
/// Create a new connection store.
pub fn new() -> Self {
Self {
connected: HashMap::new(),
}
}

/// Add a new connection for a peer.
/// Returns `true` if this is the first connection to the peer.
pub fn connection_established(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
) -> bool {
let connections = self.connected.entry(*peer_id).or_default();
let is_first_connection = connections.is_empty();
connections.insert(*connection_id);
is_first_connection
}

/// Remove a connection for a peer.
/// Returns `true` if this was the last connection to the peer.
pub fn connection_closed(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
remaining_established: &usize,
) -> bool {
if let Some(connections) = self.connected.get_mut(peer_id) {
connections.remove(connection_id);

if *remaining_established == 0 {
self.connected.remove(peer_id);
return true;
}
}
false
}

/// Check if a peer is currently connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.connected.contains_key(peer_id)
}

/// Get all connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connected.keys()
}

/// Get the number of connected peers.
pub fn connected_count(&self) -> usize {
self.connected.len()
}

/// Get the number of connections to a specific peer.
pub fn connection_count(&self, peer_id: &PeerId) -> usize {
self.connected
.get(peer_id)
.map(|connections| connections.len())
.unwrap_or(0)
}
}
1 change: 1 addition & 0 deletions misc/peer-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! or provide information to PeerStore.

mod behaviour;
pub mod connection_store;
pub mod memory_store;
mod store;

Expand Down
85 changes: 74 additions & 11 deletions misc/peer-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
//! let behaviour = Behaviour::new(store);
//! ```

use super::Store;
use crate::connection_store::ConnectionStore;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{behaviour::ConnectionEstablished, ConnectionClosed, DialError, FromSwarm};
use lru::LruCache;
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
task::{Poll, Waker},
};

use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{behaviour::ConnectionEstablished, DialError, FromSwarm};
use lru::LruCache;

use super::Store;
use tracing::{debug, trace};

/// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm).
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -59,6 +59,8 @@ pub struct MemoryStore<T = ()> {
config: Config,
/// Waker for store events.
waker: Option<Waker>,
/// Connection store to track connected peers.
connection_store: ConnectionStore,
}

impl<T> MemoryStore<T> {
Expand All @@ -69,6 +71,7 @@ impl<T> MemoryStore<T> {
records: HashMap::new(),
pending_events: VecDeque::default(),
waker: None,
connection_store: ConnectionStore::new(),
}
}

Expand Down Expand Up @@ -187,6 +190,26 @@ impl<T> MemoryStore<T> {
waker.wake(); // wake up because of update
}
}

/// Check if a peer is currently connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.connection_store.is_connected(peer_id)
}

/// Get all currently connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connection_store.connected_peers()
}

/// Get the number of currently connected peers.
pub fn connected_count(&self) -> usize {
self.connection_store.connected_count()
}

/// Get the number of connections to a specific peer.
pub fn connection_count(&self, peer_id: &PeerId) -> usize {
self.connection_store.connection_count(peer_id)
}
}

impl<T> Store for MemoryStore<T> {
Expand All @@ -199,16 +222,56 @@ impl<T> Store for MemoryStore<T> {
}
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
failed_addresses,
endpoint,
..
}) if endpoint.is_dialer() => {
if self.config.remove_addr_on_dial_error {
for failed_addr in *failed_addresses {
self.remove_address_inner(peer_id, failed_addr, false);
}) => {
if endpoint.is_dialer() {
if self.config.remove_addr_on_dial_error {
for failed_addr in *failed_addresses {
self.remove_address_inner(peer_id, failed_addr, false);
}
}
self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
}

trace!(%peer_id, ?connection_id, "Connection established");

let is_first_connection = self
.connection_store
.connection_established(peer_id, connection_id);
Comment on lines +241 to +243
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just check here if ConnectionEstablished { other_established, .. } is 0.


if is_first_connection {
debug!(?peer_id, "Peer connected");
// self.pending_events.push_back(connection_store::Event::PeerConnected {
// peer_id: *peer_id,
// connection_id: *connection_id,
// endpoint: endpoint.clone(),
// });
}
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
remaining_established,
..
}) => {
trace!(%peer_id, ?connection_id, remaining_established, "Connection closed");

let is_last_connection = self.connection_store.connection_closed(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, just check if remaining_established == 0.

peer_id,
connection_id,
remaining_established,
);

if is_last_connection {
debug!(%peer_id, "Peer disconnected");
// self.pending_events.push_back(connection_store::Event::PeerDisconnected {
// peer_id: *peer_id,
// connection_id: *connection_id,
// });
}
self.add_address_inner(peer_id, endpoint.get_remote_address(), false);
}
FromSwarm::DialFailure(info) => {
if !self.config.remove_addr_on_dial_error {
Expand Down
Loading