From 8162658c9585d7e535864378d6c2b2d9fad9b8e1 Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Thu, 21 Aug 2025 15:40:58 +0300 Subject: [PATCH 1/9] swarm: Implement Debug for OneShotHandler --- swarm/src/handler/one_shot.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index c623008dd90..3e9d9f882b1 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -112,6 +112,19 @@ where } } +impl std::fmt::Debug for OneShotHandler +where + TOutbound: std::fmt::Debug + OutboundUpgradeSend, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OneShotHandler") + .field("pending_requests", &self.pending_requests()) + .field("dial_negotiated", &self.dial_negotiated) + .field("dial_queue_len", &self.dial_queue.len()) + .finish() + } +} + impl ConnectionHandler for OneShotHandler where TInbound: InboundUpgradeSend + Send + 'static, From 5feee48edffb46aaa590d837a6a99a23f1bf51b0 Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Thu, 21 Aug 2025 15:41:08 +0300 Subject: [PATCH 2/9] metrics: Add gossipsub protocols to identify allowlist --- misc/metrics/src/identify.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index b16c6a56ccf..9e9d6c2eba6 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -37,8 +37,14 @@ use crate::protocol_stack; const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[ #[cfg(feature = "dcutr")] libp2p_dcutr::PROTOCOL_NAME, - // #[cfg(feature = "gossipsub")] - // TODO: Add Gossipsub protocol name + #[cfg(feature = "gossipsub")] + libp2p_swarm::StreamProtocol::new("/meshsub/1.2.0"), + #[cfg(feature = "gossipsub")] + libp2p_swarm::StreamProtocol::new("/meshsub/1.1.0"), + #[cfg(feature = "gossipsub")] + libp2p_swarm::StreamProtocol::new("/meshsub/1.0.0"), + #[cfg(feature = "gossipsub")] + libp2p_swarm::StreamProtocol::new("/floodsub/1.0.0"), libp2p_identify::PROTOCOL_NAME, libp2p_identify::PUSH_PROTOCOL_NAME, #[cfg(feature = "kad")] From d747e6703294c76a5ce49697c30ecd7027e0a126 Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Thu, 21 Aug 2025 15:41:12 +0300 Subject: [PATCH 3/9] core: Add doc example to TransportTimeout --- core/src/transport/timeout.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index ce494216279..3bdce0092e4 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -22,7 +22,20 @@ //! //! The connection setup includes all protocol upgrades applied on the //! underlying `Transport`. -// TODO: add example +//! +//! # Example +//! +//! ```no_run +//! use libp2p_core::{transport::MemoryTransport, Transport as _}; +//! use libp2p_core::transport::{timeout::TransportTimeout, ListenerId}; +//! use std::time::Duration; +//! +//! let base = MemoryTransport::default(); +//! let mut timeout = TransportTimeout::new(base, Duration::from_secs(1)); +//! +//! // Listen and dial as usual; the setup will be subject to timeouts. +//! let _ = timeout.listen_on(ListenerId::next(), "/memory/0".parse().unwrap()); +//! ``` use std::{ error, fmt, io, From 3eadcc2fb846a5fd51fdeef5de96817cbeb2bac9 Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:30:31 +0300 Subject: [PATCH 4/9] metrics: Remove hard-coded gossipsub protocols from identify allowlist --- misc/metrics/src/identify.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 9e9d6c2eba6..ff45a6e2964 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -37,14 +37,8 @@ use crate::protocol_stack; const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[ #[cfg(feature = "dcutr")] libp2p_dcutr::PROTOCOL_NAME, - #[cfg(feature = "gossipsub")] - libp2p_swarm::StreamProtocol::new("/meshsub/1.2.0"), - #[cfg(feature = "gossipsub")] - libp2p_swarm::StreamProtocol::new("/meshsub/1.1.0"), - #[cfg(feature = "gossipsub")] - libp2p_swarm::StreamProtocol::new("/meshsub/1.0.0"), - #[cfg(feature = "gossipsub")] - libp2p_swarm::StreamProtocol::new("/floodsub/1.0.0"), + // NOTE: Not including gossipsub here as users may configure custom protocol IDs + // via ConfigBuilder::protocol_id. Hard-coding defaults would misclassify such setups. libp2p_identify::PROTOCOL_NAME, libp2p_identify::PUSH_PROTOCOL_NAME, #[cfg(feature = "kad")] From 6b5bea5fd820819f6e19361337eac2b4f5676b2a Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:37:33 +0300 Subject: [PATCH 5/9] swarm: Simplify Debug impl bounds for OneShotHandler --- swarm/src/handler/one_shot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 3e9d9f882b1..3d9c2df6d99 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -114,7 +114,7 @@ where impl std::fmt::Debug for OneShotHandler where - TOutbound: std::fmt::Debug + OutboundUpgradeSend, + TOutbound: OutboundUpgradeSend, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OneShotHandler") From 7470c9b5e0e98f76b889f1924d8324319aed57e2 Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Fri, 22 Aug 2025 13:02:43 +0300 Subject: [PATCH 6/9] swarm: Drop Debug impl for OneShotHandler to keep scope minimal --- swarm/src/handler/one_shot.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 3d9c2df6d99..0db78230744 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -112,18 +112,7 @@ where } } -impl std::fmt::Debug for OneShotHandler -where - TOutbound: OutboundUpgradeSend, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OneShotHandler") - .field("pending_requests", &self.pending_requests()) - .field("dial_negotiated", &self.dial_negotiated) - .field("dial_queue_len", &self.dial_queue.len()) - .finish() - } -} +// Debug intentionally not implemented to keep handler lean at runtime. impl ConnectionHandler for OneShotHandler where From 288cf73d5eec9e6a48182bb4d1940f38ce542409 Mon Sep 17 00:00:00 2001 From: torrpriius <172490774+torrpriius@users.noreply.github.com> Date: Mon, 25 Aug 2025 12:33:38 +0300 Subject: [PATCH 7/9] metrics: parameterize Identify allowlist --- core/src/transport/timeout.rs | 7 ++- misc/metrics/src/identify.rs | 80 +++++++++++++++++++++++++++++++---- misc/metrics/src/lib.rs | 27 ++++++++++++ 3 files changed, 104 insertions(+), 10 deletions(-) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index 3bdce0092e4..3f62cec8569 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -26,10 +26,13 @@ //! # Example //! //! ```no_run -//! use libp2p_core::{transport::MemoryTransport, Transport as _}; -//! use libp2p_core::transport::{timeout::TransportTimeout, ListenerId}; //! use std::time::Duration; //! +//! use libp2p_core::{ +//! transport::{timeout::TransportTimeout, ListenerId, MemoryTransport}, +//! Transport as _, +//! }; +//! //! let base = MemoryTransport::default(); //! let mut timeout = TransportTimeout::new(base, Duration::from_secs(1)); //! diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index ff45a6e2964..06b144ca5b5 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -34,7 +34,7 @@ use prometheus_client::{ use crate::protocol_stack; -const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[ +const BASE_ALLOWED_PROTOCOLS: &[StreamProtocol] = &[ #[cfg(feature = "dcutr")] libp2p_dcutr::PROTOCOL_NAME, // NOTE: Not including gossipsub here as users may configure custom protocol IDs @@ -62,8 +62,57 @@ pub(crate) struct Metrics { impl Metrics { pub(crate) fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("identify"); + let peers = Peers::new(Vec::new()); + sub_registry.register_collector(Box::new(peers.clone())); + + let error = Counter::default(); + sub_registry.register( + "errors", + "Number of errors while attempting to identify the remote", + error.clone(), + ); + + let pushed = Counter::default(); + sub_registry.register( + "pushed", + "Number of times identification information of the local node has \ + been actively pushed to a peer.", + pushed.clone(), + ); + + let received = Counter::default(); + sub_registry.register( + "received", + "Number of times identification information has been received from \ + a peer", + received.clone(), + ); - let peers = Peers::default(); + let sent = Counter::default(); + sub_registry.register( + "sent", + "Number of times identification information of the local node has \ + been sent to a peer in response to an identification request", + sent.clone(), + ); + + Self { + peers, + error, + pushed, + received, + sent, + } + } + + /// Create Identify metrics with additional allowed protocols used for classification. + pub(crate) fn new_with_allowed_protocols( + registry: &mut Registry, + extra_allowed_protocols: impl IntoIterator, + ) -> Self { + let sub_registry = registry.sub_registry_with_prefix("identify"); + + let peers = Peers::new(extra_allowed_protocols.into_iter().collect()); sub_registry.register_collector(Box::new(peers.clone())); let error = Counter::default(); @@ -142,16 +191,29 @@ impl super::Recorder> for Metrics { } } -#[derive(Default, Debug, Clone)] -struct Peers(Arc>>); +#[derive(Debug, Clone)] +struct Peers(Arc); + +#[derive(Debug)] +struct PeersInner { + infos: Mutex>, + extra_allowed_protocols: Vec, +} impl Peers { + fn new(extra_allowed_protocols: Vec) -> Self { + Self(Arc::new(PeersInner { + infos: Mutex::new(HashMap::new()), + extra_allowed_protocols, + })) + } + fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) { - self.0.lock().unwrap().insert(peer_id, info); + self.0.infos.lock().unwrap().insert(peer_id, info); } fn remove(&self, peer_id: PeerId) { - self.0.lock().unwrap().remove(&peer_id); + self.0.infos.lock().unwrap().remove(&peer_id); } } @@ -161,13 +223,15 @@ impl Collector for Peers { let mut count_by_listen_addresses: HashMap = Default::default(); let mut count_by_observed_addresses: HashMap = Default::default(); - for (_, peer_info) in self.0.lock().unwrap().iter() { + for (_, peer_info) in self.0.infos.lock().unwrap().iter() { { let mut protocols: Vec<_> = peer_info .protocols .iter() .map(|p| { - if ALLOWED_PROTOCOLS.contains(p) { + if BASE_ALLOWED_PROTOCOLS.contains(p) + || self.0.extra_allowed_protocols.contains(p) + { p.to_string() } else { "unrecognized".to_string() diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 1fd79e7846f..3595316158e 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -90,6 +90,33 @@ impl Metrics { swarm: swarm::Metrics::new(sub_registry), } } + + /// Create a new set of [`Metrics`] allowing to pass additional protocols + /// for Identify classification (e.g. custom Gossipsub protocol IDs). + pub fn new_with_identify_allowed_protocols( + registry: &mut Registry, + extra_allowed_protocols: impl IntoIterator, + ) -> Self { + let sub_registry = registry.sub_registry_with_prefix("libp2p"); + Self { + #[cfg(feature = "dcutr")] + dcutr: dcutr::Metrics::new(sub_registry), + #[cfg(feature = "gossipsub")] + gossipsub: gossipsub::Metrics::new(sub_registry), + #[cfg(feature = "identify")] + identify: identify::Metrics::new_with_allowed_protocols( + sub_registry, + extra_allowed_protocols, + ), + #[cfg(feature = "kad")] + kad: kad::Metrics::new(sub_registry), + #[cfg(feature = "ping")] + ping: ping::Metrics::new(sub_registry), + #[cfg(feature = "relay")] + relay: relay::Metrics::new(sub_registry), + swarm: swarm::Metrics::new(sub_registry), + } + } } /// Recorder that can record Swarm and protocol events. From df1fdaa81582485842df60689bf601189351201d Mon Sep 17 00:00:00 2001 From: Torprius Date: Mon, 25 Aug 2025 17:34:25 +0200 Subject: [PATCH 8/9] Update swarm/src/handler/one_shot.rs Co-authored-by: Darius Clark --- swarm/src/handler/one_shot.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/swarm/src/handler/one_shot.rs b/swarm/src/handler/one_shot.rs index 0db78230744..c623008dd90 100644 --- a/swarm/src/handler/one_shot.rs +++ b/swarm/src/handler/one_shot.rs @@ -112,8 +112,6 @@ where } } -// Debug intentionally not implemented to keep handler lean at runtime. - impl ConnectionHandler for OneShotHandler where TInbound: InboundUpgradeSend + Send + 'static, From 5b3896727999c254273c62f17253a1bcfb22bc57 Mon Sep 17 00:00:00 2001 From: Torprius Date: Mon, 25 Aug 2025 17:38:21 +0200 Subject: [PATCH 9/9] Update timeout.rs --- core/src/transport/timeout.rs | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index 3f62cec8569..ce494216279 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -22,23 +22,7 @@ //! //! The connection setup includes all protocol upgrades applied on the //! underlying `Transport`. -//! -//! # Example -//! -//! ```no_run -//! use std::time::Duration; -//! -//! use libp2p_core::{ -//! transport::{timeout::TransportTimeout, ListenerId, MemoryTransport}, -//! Transport as _, -//! }; -//! -//! let base = MemoryTransport::default(); -//! let mut timeout = TransportTimeout::new(base, Duration::from_secs(1)); -//! -//! // Listen and dial as usual; the setup will be subject to timeouts. -//! let _ = timeout.listen_on(ListenerId::next(), "/memory/0".parse().unwrap()); -//! ``` +// TODO: add example use std::{ error, fmt, io,