Skip to content
Open
84 changes: 74 additions & 10 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ use prometheus_client::{

use crate::protocol_stack;

const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[
const BASE_ALLOWED_PROTOCOLS: &[StreamProtocol] = &[
#[cfg(feature = "dcutr")]
libp2p_dcutr::PROTOCOL_NAME,
// #[cfg(feature = "gossipsub")]
// TODO: Add Gossipsub protocol name
// NOTE: Not including gossipsub here as users may configure custom protocol IDs
// via ConfigBuilder::protocol_id. Hard-coding defaults would misclassify such setups.
libp2p_identify::PROTOCOL_NAME,
libp2p_identify::PUSH_PROTOCOL_NAME,
#[cfg(feature = "kad")]
Expand All @@ -62,8 +62,57 @@ pub(crate) struct Metrics {
impl Metrics {
pub(crate) fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");
let peers = Peers::new(Vec::new());
sub_registry.register_collector(Box::new(peers.clone()));

let error = Counter::default();
sub_registry.register(
"errors",
"Number of errors while attempting to identify the remote",
error.clone(),
);

let pushed = Counter::default();
sub_registry.register(
"pushed",
"Number of times identification information of the local node has \
been actively pushed to a peer.",
pushed.clone(),
);

let received = Counter::default();
sub_registry.register(
"received",
"Number of times identification information has been received from \
a peer",
received.clone(),
);

let peers = Peers::default();
let sent = Counter::default();
sub_registry.register(
"sent",
"Number of times identification information of the local node has \
been sent to a peer in response to an identification request",
sent.clone(),
);

Self {
peers,
error,
pushed,
received,
sent,
}
}

/// Create Identify metrics with additional allowed protocols used for classification.
pub(crate) fn new_with_allowed_protocols(
registry: &mut Registry,
extra_allowed_protocols: impl IntoIterator<Item = StreamProtocol>,
) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");

let peers = Peers::new(extra_allowed_protocols.into_iter().collect());
sub_registry.register_collector(Box::new(peers.clone()));

let error = Counter::default();
Expand Down Expand Up @@ -142,16 +191,29 @@ impl<TBvEv> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv>> for Metrics {
}
}

#[derive(Default, Debug, Clone)]
struct Peers(Arc<Mutex<HashMap<PeerId, libp2p_identify::Info>>>);
#[derive(Debug, Clone)]
struct Peers(Arc<PeersInner>);

#[derive(Debug)]
struct PeersInner {
infos: Mutex<HashMap<PeerId, libp2p_identify::Info>>,
extra_allowed_protocols: Vec<StreamProtocol>,
}

impl Peers {
fn new(extra_allowed_protocols: Vec<StreamProtocol>) -> Self {
Self(Arc::new(PeersInner {
infos: Mutex::new(HashMap::new()),
extra_allowed_protocols,
}))
}

fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) {
self.0.lock().unwrap().insert(peer_id, info);
self.0.infos.lock().unwrap().insert(peer_id, info);
}

fn remove(&self, peer_id: PeerId) {
self.0.lock().unwrap().remove(&peer_id);
self.0.infos.lock().unwrap().remove(&peer_id);
}
}

Expand All @@ -161,13 +223,15 @@ impl Collector for Peers {
let mut count_by_listen_addresses: HashMap<String, i64> = Default::default();
let mut count_by_observed_addresses: HashMap<String, i64> = Default::default();

for (_, peer_info) in self.0.lock().unwrap().iter() {
for (_, peer_info) in self.0.infos.lock().unwrap().iter() {
{
let mut protocols: Vec<_> = peer_info
.protocols
.iter()
.map(|p| {
if ALLOWED_PROTOCOLS.contains(p) {
if BASE_ALLOWED_PROTOCOLS.contains(p)
|| self.0.extra_allowed_protocols.contains(p)
{
p.to_string()
} else {
"unrecognized".to_string()
Expand Down
27 changes: 27 additions & 0 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,33 @@
swarm: swarm::Metrics::new(sub_registry),
}
}

/// Create a new set of [`Metrics`] allowing to pass additional protocols
/// for Identify classification (e.g. custom Gossipsub protocol IDs).
pub fn new_with_identify_allowed_protocols(
registry: &mut Registry,
extra_allowed_protocols: impl IntoIterator<Item = libp2p_swarm::StreamProtocol>,

Check failure on line 98 in misc/metrics/src/lib.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-metrics

unused variable: `extra_allowed_protocols`
) -> Self {
let sub_registry = registry.sub_registry_with_prefix("libp2p");
Self {
#[cfg(feature = "dcutr")]
dcutr: dcutr::Metrics::new(sub_registry),
#[cfg(feature = "gossipsub")]
gossipsub: gossipsub::Metrics::new(sub_registry),
#[cfg(feature = "identify")]
identify: identify::Metrics::new_with_allowed_protocols(
sub_registry,
extra_allowed_protocols,
),
#[cfg(feature = "kad")]
kad: kad::Metrics::new(sub_registry),
#[cfg(feature = "ping")]
ping: ping::Metrics::new(sub_registry),
#[cfg(feature = "relay")]
relay: relay::Metrics::new(sub_registry),
swarm: swarm::Metrics::new(sub_registry),
}
}
}

/// Recorder that can record Swarm and protocol events.
Expand Down
Loading