Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/apollo_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl ConsensusManager {
num_blacklisted_peers: CONSENSUS_NUM_BLACKLISTED_PEERS,
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
sqmr_metrics: None,
gossipsub_metrics: None,
});
let mut network_manager =
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics);
Expand Down
1 change: 1 addition & 0 deletions crates/apollo_mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub fn create_p2p_propagator_and_runner(
num_blacklisted_peers: MEMPOOL_P2P_NUM_BLACKLISTED_PEERS,
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
sqmr_metrics: None,
gossipsub_metrics: None,
});
let mut network_manager = NetworkManager::new(
mempool_p2p_config.network_config,
Expand Down
28 changes: 24 additions & 4 deletions crates/apollo_network/src/gossipsub_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ pub type Topic = gossipsub::Sha256Topic;

#[derive(Debug)]
pub enum ExternalEvent {
#[allow(dead_code)]
Received { originated_peer_id: PeerId, message: Bytes, topic_hash: TopicHash },
Subscribed { peer_id: PeerId, topic_hash: TopicHash },
Unsubscribed { peer_id: PeerId, topic_hash: TopicHash },
GossipsubNotSupported { peer_id: PeerId },
SlowPeer { peer_id: PeerId, failed_messages: gossipsub::FailedMessages },
}

impl From<gossipsub::Event> for mixed_behaviour::Event {
Expand All @@ -40,9 +43,26 @@ impl From<gossipsub::Event> for mixed_behaviour::Event {
},
))
}
_ => mixed_behaviour::Event::ToOtherBehaviourEvent(
mixed_behaviour::ToOtherBehaviourEvent::NoOp,
),
gossipsub::Event::Subscribed { peer_id, topic } => {
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
ExternalEvent::Subscribed { peer_id, topic_hash: topic },
))
}
gossipsub::Event::Unsubscribed { peer_id, topic } => {
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
ExternalEvent::Unsubscribed { peer_id, topic_hash: topic },
))
}
gossipsub::Event::GossipsubNotSupported { peer_id } => {
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
ExternalEvent::GossipsubNotSupported { peer_id },
))
}
gossipsub::Event::SlowPeer { peer_id, failed_messages } => {
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
ExternalEvent::SlowPeer { peer_id, failed_messages },
))
}
}
}
}
Expand Down
69 changes: 69 additions & 0 deletions crates/apollo_network/src/network_manager/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,79 @@ impl SqmrNetworkMetrics {
}
}

pub struct GossipsubMetrics {
/// Number of peers in the mesh network (peers we directly exchange messages with)
pub num_mesh_peers: MetricGauge,
/// Total number of known peers and their subscribed topics
pub num_all_peers: MetricGauge,
/// Number of topics we are currently subscribed to
pub num_subscribed_topics: MetricGauge,
/// Number of peers with gossipsub protocol support
pub num_gossipsub_peers: MetricGauge,
/// Number of peers with floodsub protocol support
pub num_floodsub_peers: MetricGauge,
/// Average number of subscribed topics per peer
pub avg_topics_per_peer: MetricGauge,
/// Maximum number of subscribed topics by any single peer
pub max_topics_per_peer: MetricGauge,
/// Minimum number of subscribed topics by any single peer (for peers with >0 topics)
pub min_topics_per_peer: MetricGauge,
/// Total number of topic subscriptions across all peers
pub total_topic_subscriptions: MetricGauge,
/// Average mesh peers per topic that we're subscribed to
pub avg_mesh_peers_per_topic: MetricGauge,
/// Maximum mesh peers for any single topic we're subscribed to
pub max_mesh_peers_per_topic: MetricGauge,
/// Minimum mesh peers for any single topic we're subscribed to
pub min_mesh_peers_per_topic: MetricGauge,
/// Number of peers with positive peer scores (if peer scoring is enabled)
pub num_peers_with_positive_score: MetricGauge,
/// Number of peers with negative peer scores (if peer scoring is enabled)
pub num_peers_with_negative_score: MetricGauge,
/// Average peer score across all scored peers (if peer scoring is enabled)
pub avg_peer_score: MetricGauge,

// event metrics
pub count_event_messages_received: MetricCounter,
pub count_event_peer_subscribed: MetricCounter,
pub count_event_peer_unsubscribed: MetricCounter,
pub count_event_gossipsub_not_supported: MetricCounter,
pub count_event_slow_peers: MetricCounter,
}

impl GossipsubMetrics {
pub fn register(&self) {
self.num_mesh_peers.register();
self.num_all_peers.register();
self.num_subscribed_topics.register();
self.num_gossipsub_peers.register();
self.num_floodsub_peers.register();
self.avg_topics_per_peer.register();
self.max_topics_per_peer.register();
self.min_topics_per_peer.register();
self.total_topic_subscriptions.register();
self.avg_mesh_peers_per_topic.register();
self.max_mesh_peers_per_topic.register();
self.min_mesh_peers_per_topic.register();
self.num_peers_with_positive_score.register();
self.num_peers_with_negative_score.register();
self.avg_peer_score.register();
self.count_event_messages_received.register();
self.count_event_peer_subscribed.register();
self.count_event_peer_unsubscribed.register();
self.count_event_gossipsub_not_supported.register();
self.count_event_slow_peers.register();
}
}

// TODO(alonl, shahak): Consider making these fields private and receive Topics instead of
// TopicHashes in the constructor
pub struct NetworkMetrics {
pub num_connected_peers: MetricGauge,
pub num_blacklisted_peers: MetricGauge,
pub broadcast_metrics_by_topic: Option<HashMap<TopicHash, BroadcastNetworkMetrics>>,
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
pub gossipsub_metrics: Option<GossipsubMetrics>,
}

impl NetworkMetrics {
Expand All @@ -52,5 +118,8 @@ impl NetworkMetrics {
if let Some(sqmr_metrics) = self.sqmr_metrics.as_ref() {
sqmr_metrics.register();
}
if let Some(gossipsub_metrics) = self.gossipsub_metrics.as_ref() {
gossipsub_metrics.register();
}
}
}
55 changes: 50 additions & 5 deletions crates/apollo_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::{BTreeMap, HashMap};
use std::net::Ipv4Addr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use apollo_network_types::network_types::{BroadcastedMessageMetadata, OpaquePeerId};
use async_trait::async_trait;
Expand All @@ -23,6 +24,7 @@ use libp2p::identity::Keypair;
use libp2p::swarm::SwarmEvent;
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
use metrics::NetworkMetrics;
use tokio::time::{sleep_until, Instant};
use tracing::{debug, error, trace, warn};

use self::swarm_trait::SwarmTrait;
Expand Down Expand Up @@ -61,6 +63,7 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
continue_propagation_sender: Sender<BroadcastedMessageMetadata>,
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
metrics: Option<NetworkMetrics>,
next_metrics_update: Instant,
}

impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
Expand Down Expand Up @@ -89,6 +92,12 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
Some(broadcasted_message_metadata) = self.continue_propagation_receiver.next() => {
self.swarm.continue_propagation(broadcasted_message_metadata);
}
_ = sleep_until(self.next_metrics_update) => {
if let Some(ref metrics) = self.metrics {
self.swarm.update_metrics(metrics);
}
self.next_metrics_update = Instant::now() + Duration::from_secs(1);
}
}
}
}
Expand Down Expand Up @@ -128,6 +137,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
continue_propagation_sender,
continue_propagation_receiver,
metrics,
next_metrics_update: Instant::now() + Duration::from_secs(1),
}
}

Expand Down Expand Up @@ -527,23 +537,58 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}

/// Update the metrics for gossipsub events if these metrics were requested.
fn update_gossipsub_metrics_on_event(&self, event: &gossipsub_impl::ExternalEvent) {
if let Some(gossipsub_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.gossipsub_metrics.as_ref())
{
match &event {
gossipsub_impl::ExternalEvent::Received { .. } => {
gossipsub_metrics.count_event_messages_received.increment(1);
}
gossipsub_impl::ExternalEvent::Subscribed { .. } => {
gossipsub_metrics.count_event_peer_subscribed.increment(1);
}
gossipsub_impl::ExternalEvent::Unsubscribed { .. } => {
gossipsub_metrics.count_event_peer_unsubscribed.increment(1);
}
gossipsub_impl::ExternalEvent::GossipsubNotSupported { .. } => {
gossipsub_metrics.count_event_gossipsub_not_supported.increment(1);
}
gossipsub_impl::ExternalEvent::SlowPeer { .. } => {
gossipsub_metrics.count_event_slow_peers.increment(1);
}
}
}
}

fn handle_gossipsub_behaviour_event(
&mut self,
event: gossipsub_impl::ExternalEvent,
) -> Result<(), NetworkError> {
// Record gossipsub metrics if available
self.update_gossipsub_metrics_on_event(&event);

let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
event
else {
return Ok(());
};

// Record broadcast metrics for legacy compatibility
if let Some(broadcast_metrics_by_topic) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
{
let gossipsub_impl::ExternalEvent::Received { ref topic_hash, .. } = event;
match broadcast_metrics_by_topic.get(topic_hash) {
match broadcast_metrics_by_topic.get(&topic_hash) {
Some(broadcast_metrics) => {
broadcast_metrics.num_received_broadcast_messages.increment(1)
}
None => error!("Attempted to update topic metric with unregistered topic_hash"),
None => {
error!("Attempted to update topic metric with unregistered topic_hash")
}
}
}
let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
event;

trace!("Received broadcast message with topic hash: {topic_hash:?}");
let broadcasted_message_metadata = BroadcastedMessageMetadata {
originator_id: OpaquePeerId::private_new(originated_peer_id),
Expand Down
92 changes: 92 additions & 0 deletions crates/apollo_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::{info, warn};
use super::BroadcastedMessageMetadata;
use crate::gossipsub_impl::Topic;
use crate::misconduct_score::MisconductScore;
use crate::network_manager::metrics::NetworkMetrics;
use crate::peer_manager::ReputationModifier;
use crate::sqmr::behaviour::SessionIdNotFoundError;
use crate::sqmr::{InboundSessionId, OutboundSessionId, SessionId};
Expand Down Expand Up @@ -48,6 +49,8 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
fn add_new_supported_inbound_protocol(&mut self, protocol_name: StreamProtocol);

fn continue_propagation(&mut self, message_metadata: BroadcastedMessageMetadata);

fn update_metrics(&self, metrics: &NetworkMetrics);
}

impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
Expand Down Expand Up @@ -122,4 +125,93 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {

// TODO(shahak): Implement this function.
fn continue_propagation(&mut self, _message_metadata: BroadcastedMessageMetadata) {}

fn update_metrics(&self, metrics: &NetworkMetrics) {
let Some(gossipsub_metrics) = &metrics.gossipsub_metrics else { return };
let gossipsub = &self.behaviour().gossipsub;

// Helper to convert usize counts to f64 metrics
let set_count = |gauge: &apollo_metrics::metrics::MetricGauge, count: usize| {
gauge.set(f64::from(u32::try_from(count).unwrap_or(u32::MAX)));
};

// Basic counts
set_count(&gossipsub_metrics.num_mesh_peers, gossipsub.all_mesh_peers().count());
set_count(&gossipsub_metrics.num_subscribed_topics, gossipsub.topics().count());

// Collect peer data once for analysis
let all_peers: Vec<_> = gossipsub.all_peers().collect();
set_count(&gossipsub_metrics.num_all_peers, all_peers.len());
set_count(&gossipsub_metrics.num_gossipsub_peers, gossipsub.peer_protocol().count());
gossipsub_metrics.num_floodsub_peers.set(0.0); // Currently all peers are gossipsub

// Topic subscription analysis
let topic_counts: Vec<usize> = all_peers.iter().map(|(_, topics)| topics.len()).collect();
let total_subscriptions: usize = topic_counts.iter().sum();
set_count(&gossipsub_metrics.total_topic_subscriptions, total_subscriptions);

if topic_counts.is_empty() {
[
&gossipsub_metrics.avg_topics_per_peer,
&gossipsub_metrics.max_topics_per_peer,
&gossipsub_metrics.min_topics_per_peer,
]
.iter()
.for_each(|metric| metric.set(0.0));
} else {
let avg = f64::from(u32::try_from(total_subscriptions).unwrap_or(u32::MAX)) / f64::from(u32::try_from(topic_counts.len()).unwrap_or(u32::MAX));
gossipsub_metrics.avg_topics_per_peer.set(avg);

if let (Some(&max), Some(&min_non_zero)) =
(topic_counts.iter().max(), topic_counts.iter().filter(|&&c| c > 0).min())
{
set_count(&gossipsub_metrics.max_topics_per_peer, max);
set_count(&gossipsub_metrics.min_topics_per_peer, min_non_zero);
}
}

// Mesh analysis per topic
let our_topics: Vec<_> = gossipsub.topics().collect();
if our_topics.is_empty() {
[
&gossipsub_metrics.avg_mesh_peers_per_topic,
&gossipsub_metrics.max_mesh_peers_per_topic,
&gossipsub_metrics.min_mesh_peers_per_topic,
]
.iter()
.for_each(|metric| metric.set(0.0));
} else {
let mesh_counts: Vec<usize> =
our_topics.iter().map(|topic| gossipsub.mesh_peers(topic).count()).collect();
let total_mesh = mesh_counts.iter().sum::<usize>();
let avg_mesh = f64::from(u32::try_from(total_mesh).unwrap_or(u32::MAX)) / f64::from(u32::try_from(our_topics.len()).unwrap_or(u32::MAX));
gossipsub_metrics.avg_mesh_peers_per_topic.set(avg_mesh);

if let (Some(&min), Some(&max)) = (mesh_counts.iter().min(), mesh_counts.iter().max()) {
set_count(&gossipsub_metrics.min_mesh_peers_per_topic, min);
set_count(&gossipsub_metrics.max_mesh_peers_per_topic, max);
}
}

// Peer scoring analysis
let peer_scores: Vec<f64> =
all_peers.iter().filter_map(|(peer_id, _)| gossipsub.peer_score(peer_id)).collect();
if peer_scores.is_empty() {
[
&gossipsub_metrics.num_peers_with_positive_score,
&gossipsub_metrics.num_peers_with_negative_score,
&gossipsub_metrics.avg_peer_score,
]
.iter()
.for_each(|metric| metric.set(0.0));
} else {
let positive_count = peer_scores.iter().filter(|&&score| score > 0.0).count();
let negative_count = peer_scores.iter().filter(|&&score| score < 0.0).count();
let avg_score = peer_scores.iter().sum::<f64>() / f64::from(u32::try_from(peer_scores.len()).unwrap_or(u32::MAX));

set_count(&gossipsub_metrics.num_peers_with_positive_score, positive_count);
set_count(&gossipsub_metrics.num_peers_with_negative_score, negative_count);
gossipsub_metrics.avg_peer_score.set(avg_score);
}
}
}
2 changes: 2 additions & 0 deletions crates/apollo_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ impl SwarmTrait for MockSwarm {
fn continue_propagation(&mut self, _message_metadata: super::BroadcastedMessageMetadata) {
unimplemented!()
}

fn update_metrics(&self, _: &super::metrics::NetworkMetrics) {}
}

const BUFFER_SIZE: usize = 100;
Expand Down
1 change: 1 addition & 0 deletions crates/apollo_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl StateSyncRunner {
num_active_inbound_sessions: P2P_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
num_active_outbound_sessions: P2P_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
}),
gossipsub_metrics: None,
});
NetworkManager::new(
network_config.clone(),
Expand Down
Loading