Skip to content

Commit 865d0ce

Browse files
apollo_network: added libp2p metrics
1 parent 2c32041 commit 865d0ce

File tree

9 files changed

+133
-3
lines changed

9 files changed

+133
-3
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ phf = "0.11"
261261
pretty_assertions = "1.4.0"
262262
primitive-types = "0.12.1"
263263
proc-macro2 = "1.0"
264+
prometheus-client = "0.23.1"
264265
prometheus-parse = "0.2.4"
265266
prost = "0.12.1"
266267
prost-build = "0.12.1"

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ impl ConsensusManager {
9898
num_blacklisted_peers: CONSENSUS_NUM_BLACKLISTED_PEERS,
9999
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
100100
sqmr_metrics: None,
101+
libp2p_metrics_prefix: None,
101102
});
102103
let mut network_manager =
103104
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics);

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub fn create_p2p_propagator_and_runner(
5050
num_blacklisted_peers: MEMPOOL_P2P_NUM_BLACKLISTED_PEERS,
5151
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
5252
sqmr_metrics: None,
53+
libp2p_metrics_prefix: None,
5354
});
5455
let mut network_manager = NetworkManager::new(
5556
mempool_p2p_config.network_config,

crates/apollo_network/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ libp2p = { workspace = true, features = [
2525
"identify",
2626
"kad",
2727
"macros",
28+
"metrics",
2829
"noise",
2930
"quic",
3031
"serde",
@@ -34,6 +35,8 @@ libp2p = { workspace = true, features = [
3435
] }
3536
metrics.workspace = true
3637
metrics-exporter-prometheus.workspace = true
38+
prometheus-client.workspace = true
39+
prometheus-parse.workspace = true
3740
replace_with.workspace = true
3841
serde = { workspace = true, features = ["derive"] }
3942
starknet_api.workspace = true
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use libp2p::metrics::Registry;
2+
use metrics::{counter, describe_counter, describe_gauge, gauge};
3+
use prometheus_client::encoding::text::encode;
4+
use prometheus_parse::Value;
5+
6+
/// libp2p uses `prometheus-client` for metrics, which updates the metrics to a `Registry`.
7+
/// We use `metrics-exporter-prometheus` so we need to update these
8+
/// metrics when the registry is updated.
9+
pub fn connect_libp2p_registry_to_metrics_exporter_prometheus(prefix: String, registry: Registry) {
10+
tokio::spawn(async move {
11+
// This will update the libp2p metrics in the registry.
12+
loop {
13+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
14+
15+
let mut sink = String::new();
16+
encode(&mut sink, &registry).unwrap();
17+
18+
let metrics =
19+
prometheus_parse::Scrape::parse(sink.lines().map(|s| Ok(s.to_owned()))).unwrap();
20+
21+
for sample in metrics.samples.iter() {
22+
let metric_name = format!("{}{}", prefix, sample.metric);
23+
24+
match &sample.value {
25+
Value::Counter(value) => {
26+
// Register and update counter metric
27+
describe_counter!(metric_name.clone(), "LibP2P counter metric");
28+
#[allow(clippy::as_conversions)]
29+
counter!(metric_name).absolute(*value as u64);
30+
}
31+
Value::Gauge(value) => {
32+
// Register and update gauge metric
33+
describe_gauge!(metric_name.clone(), "LibP2P gauge metric");
34+
gauge!(metric_name).set(*value);
35+
}
36+
Value::Histogram(_) => {
37+
// Skip histograms for now as they require more complex handling
38+
39+
continue;
40+
}
41+
Value::Summary(_) => {
42+
// Skip summaries for now as they require more complex handling
43+
continue;
44+
}
45+
Value::Untyped(value) => {
46+
// Treat untyped metrics as gauges
47+
describe_gauge!(metric_name.clone(), "LibP2P untyped metric");
48+
gauge!(metric_name).set(*value);
49+
}
50+
}
51+
}
52+
}
53+
});
54+
}

crates/apollo_network/src/network_manager/metrics.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ pub struct NetworkMetrics {
3636
pub num_blacklisted_peers: MetricGauge,
3737
pub broadcast_metrics_by_topic: Option<HashMap<TopicHash, BroadcastNetworkMetrics>>,
3838
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
39+
/// Set the desired prefix for the libp2p metrics.
40+
/// If `None`, the metrics will not be registered.
41+
/// If `Some(prefix)`, the metrics will be registered with the prefix.
42+
pub libp2p_metrics_prefix: Option<String>,
3943
}
4044

4145
impl NetworkMetrics {
@@ -52,5 +56,30 @@ impl NetworkMetrics {
5256
if let Some(sqmr_metrics) = self.sqmr_metrics.as_ref() {
5357
sqmr_metrics.register();
5458
}
59+
// if let Some(libp2p_metrics) = self.libp2p_metrics.as_ref() {
60+
// libp2p_metrics.register();
61+
// }
5562
}
5663
}
64+
65+
// pub struct LibP2PMetrics {
66+
// pub num_connected_peers: MetricGauge,
67+
// pub num_blacklisted_peers: MetricGauge,
68+
// }
69+
70+
// impl LibP2PMetrics {
71+
// pub fn register(&self) {
72+
// self.num_connected_peers.register();
73+
// self.num_blacklisted_peers.register();
74+
// }
75+
// }
76+
77+
// /// Must implement debug for conversion
78+
// impl Debug for LibP2PMetrics {
79+
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80+
// f.debug_struct("LibP2PMetrics")
81+
// .field("num_connected_peers", &self.num_connected_peers.get_scope())
82+
// .field("num_blacklisted_peers", &self.num_blacklisted_peers.get_scope())
83+
// .finish()
84+
// }
85+
// }

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod libp2p_metrics;
12
pub mod metrics;
23
mod swarm_trait;
34
#[cfg(test)]
@@ -20,6 +21,7 @@ use futures::stream::{FuturesUnordered, Map, Stream};
2021
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
2122
use libp2p::gossipsub::{SubscriptionError, TopicHash};
2223
use libp2p::identity::Keypair;
24+
use libp2p::metrics::{Metrics, Recorder, Registry};
2325
use libp2p::swarm::SwarmEvent;
2426
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
2527
use metrics::NetworkMetrics;
@@ -29,6 +31,7 @@ use self::swarm_trait::SwarmTrait;
2931
use crate::gossipsub_impl::Topic;
3032
use crate::misconduct_score::MisconductScore;
3133
use crate::mixed_behaviour::{self, BridgedBehaviour};
34+
use crate::network_manager::libp2p_metrics::connect_libp2p_registry_to_metrics_exporter_prometheus;
3235
use crate::sqmr::behaviour::SessionError;
3336
use crate::sqmr::{self, InboundSessionId, OutboundSessionId, SessionId};
3437
use crate::utils::{is_localhost, make_multiaddr, StreamMap};
@@ -61,6 +64,7 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
6164
continue_propagation_sender: Sender<BroadcastedMessageMetadata>,
6265
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
6366
metrics: Option<NetworkMetrics>,
67+
libp2p_metrics: Option<Metrics>,
6468
}
6569

6670
impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
@@ -95,12 +99,13 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
9599

96100
// TODO(shahak): remove the advertised_multiaddr arg once we manage external addresses
97101
// in a behaviour.
98-
pub(crate) fn generic_new(
102+
pub(crate) fn generic_new_with_libp2p_metrics(
99103
mut swarm: SwarmT,
100104
advertised_multiaddr: Option<Multiaddr>,
101105
metrics: Option<NetworkMetrics>,
102106
broadcasted_message_metadata_buffer_size: usize,
103107
reported_peer_ids_buffer_size: usize,
108+
libp2p_metrics: Option<Metrics>,
104109
) -> Self {
105110
let reported_peer_receivers = FuturesUnordered::new();
106111
reported_peer_receivers.push(futures::future::pending().boxed());
@@ -128,9 +133,28 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
128133
continue_propagation_sender,
129134
continue_propagation_receiver,
130135
metrics,
136+
libp2p_metrics,
131137
}
132138
}
133139

140+
#[cfg(test)]
141+
pub(crate) fn generic_new(
142+
swarm: SwarmT,
143+
advertised_multiaddr: Option<Multiaddr>,
144+
metrics: Option<NetworkMetrics>,
145+
broadcasted_message_metadata_buffer_size: usize,
146+
reported_peer_ids_buffer_size: usize,
147+
) -> Self {
148+
Self::generic_new_with_libp2p_metrics(
149+
swarm,
150+
advertised_multiaddr,
151+
metrics,
152+
broadcasted_message_metadata_buffer_size,
153+
reported_peer_ids_buffer_size,
154+
None,
155+
)
156+
}
157+
134158
// TODO(Shahak): Support multiple protocols where they're all different versions of the same
135159
// protocol
136160
pub fn register_sqmr_protocol_server<Query, Response>(
@@ -267,6 +291,9 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
267291
&mut self,
268292
event: SwarmEvent<mixed_behaviour::Event>,
269293
) -> Result<(), NetworkError> {
294+
if let Some(metrics) = self.libp2p_metrics.as_ref() {
295+
metrics.record(&event);
296+
}
270297
match event {
271298
SwarmEvent::ConnectionEstablished { peer_id, num_established, .. } => {
272299
debug!("Connected to peer id: {peer_id:?}");
@@ -686,7 +713,7 @@ impl NetworkManager {
686713
pub fn new(
687714
config: NetworkConfig,
688715
node_version: Option<String>,
689-
metrics: Option<NetworkMetrics>,
716+
mut metrics: Option<NetworkMetrics>,
690717
) -> Self {
691718
let NetworkConfig {
692719
port,
@@ -711,13 +738,15 @@ impl NetworkManager {
711738
}
712739
None => Keypair::generate_ed25519(),
713740
};
741+
let mut registry = Registry::default();
714742
let mut swarm = SwarmBuilder::with_existing_identity(key_pair)
715743
.with_tokio()
716744
// TODO(AndrewL): .with_quic()
717745
.with_tcp(Default::default(), noise::Config::new, yamux::Config::default)
718746
.expect("Error building TCP transport")
719747
.with_dns()
720748
.expect("Error building DNS transport")
749+
.with_bandwidth_metrics(&mut registry)
721750
.with_behaviour(|key| {
722751
mixed_behaviour::MixedBehaviour::new(
723752
key.clone(),
@@ -733,6 +762,14 @@ impl NetworkManager {
733762
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(idle_connection_timeout))
734763
.build();
735764

765+
let libp2p_metrics = Metrics::new(&mut registry);
766+
767+
if let Some(metrics) = metrics.as_mut() {
768+
if let Some(prefix) = metrics.libp2p_metrics_prefix.as_ref() {
769+
connect_libp2p_registry_to_metrics_exporter_prometheus(prefix.clone(), registry);
770+
}
771+
}
772+
736773
swarm
737774
.listen_on(listen_address.clone())
738775
.unwrap_or_else(|_| panic!("Error while binding to {listen_address}"));
@@ -742,12 +779,13 @@ impl NetworkManager {
742779
.with_p2p(*swarm.local_peer_id())
743780
.expect("advertised_multiaddr has a peer id different than the local peer id")
744781
});
745-
Self::generic_new(
782+
Self::generic_new_with_libp2p_metrics(
746783
swarm,
747784
advertised_multiaddr,
748785
metrics,
749786
broadcasted_message_metadata_buffer_size,
750787
reported_peer_ids_buffer_size,
788+
Some(libp2p_metrics),
751789
)
752790
}
753791

crates/apollo_state_sync/src/runner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ impl StateSyncRunner {
201201
num_active_inbound_sessions: P2P_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
202202
num_active_outbound_sessions: P2P_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
203203
}),
204+
libp2p_metrics_prefix: None,
204205
});
205206
NetworkManager::new(
206207
network_config.clone(),

0 commit comments

Comments
 (0)