@@ -8,6 +8,7 @@ use tracing::{info, warn};
88use super :: BroadcastedMessageMetadata ;
99use crate :: gossipsub_impl:: Topic ;
1010use crate :: misconduct_score:: MisconductScore ;
11+ use crate :: network_manager:: metrics:: NetworkMetrics ;
1112use crate :: peer_manager:: ReputationModifier ;
1213use crate :: sqmr:: behaviour:: SessionIdNotFoundError ;
1314use crate :: sqmr:: { InboundSessionId , OutboundSessionId , SessionId } ;
@@ -48,6 +49,8 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4849 fn add_new_supported_inbound_protocol ( & mut self , protocol_name : StreamProtocol ) ;
4950
5051 fn continue_propagation ( & mut self , message_metadata : BroadcastedMessageMetadata ) ;
52+
53+ fn update_metrics ( & self , metrics : & NetworkMetrics ) ;
5154}
5255
5356impl SwarmTrait for Swarm < mixed_behaviour:: MixedBehaviour > {
@@ -122,4 +125,94 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
122125
123126 // TODO(shahak): Implement this function.
124127 fn continue_propagation ( & mut self , _message_metadata : BroadcastedMessageMetadata ) { }
128+
129+ fn update_metrics ( & self , metrics : & NetworkMetrics ) {
130+ let Some ( gossipsub_metrics) = & metrics. gossipsub_metrics else { return } ;
131+ let gossipsub = & self . behaviour ( ) . gossipsub ;
132+
133+ // Helper to convert usize counts to f64 metrics
134+ let set_count = |gauge : & apollo_metrics:: metrics:: MetricGauge , count : usize | {
135+ #[ allow( clippy:: as_conversions) ]
136+ gauge. set ( count as f64 ) ;
137+ } ;
138+
139+ // Basic counts
140+ set_count ( & gossipsub_metrics. num_mesh_peers , gossipsub. all_mesh_peers ( ) . count ( ) ) ;
141+ set_count ( & gossipsub_metrics. num_subscribed_topics , gossipsub. topics ( ) . count ( ) ) ;
142+
143+ // Collect peer data once for analysis
144+ let all_peers: Vec < _ > = gossipsub. all_peers ( ) . collect ( ) ;
145+ set_count ( & gossipsub_metrics. num_all_peers , all_peers. len ( ) ) ;
146+ set_count ( & gossipsub_metrics. num_gossipsub_peers , gossipsub. peer_protocol ( ) . count ( ) ) ;
147+ gossipsub_metrics. num_floodsub_peers . set ( 0.0 ) ; // Currently all peers are gossipsub
148+
149+ // Topic subscription analysis
150+ let topic_counts: Vec < usize > = all_peers. iter ( ) . map ( |( _, topics) | topics. len ( ) ) . collect ( ) ;
151+ let total_subscriptions: usize = topic_counts. iter ( ) . sum ( ) ;
152+ set_count ( & gossipsub_metrics. total_topic_subscriptions , total_subscriptions) ;
153+
154+ if topic_counts. is_empty ( ) {
155+ [
156+ & gossipsub_metrics. avg_topics_per_peer ,
157+ & gossipsub_metrics. max_topics_per_peer ,
158+ & gossipsub_metrics. min_topics_per_peer ,
159+ ]
160+ . iter ( )
161+ . for_each ( |metric| metric. set ( 0.0 ) ) ;
162+ } else {
163+ let avg = total_subscriptions as f64 / topic_counts. len ( ) as f64 ;
164+ gossipsub_metrics. avg_topics_per_peer . set ( avg) ;
165+
166+ if let ( Some ( & max) , Some ( & min_non_zero) ) =
167+ ( topic_counts. iter ( ) . max ( ) , topic_counts. iter ( ) . filter ( |& & c| c > 0 ) . min ( ) )
168+ {
169+ set_count ( & gossipsub_metrics. max_topics_per_peer , max) ;
170+ set_count ( & gossipsub_metrics. min_topics_per_peer , min_non_zero) ;
171+ }
172+ }
173+
174+ // Mesh analysis per topic
175+ let our_topics: Vec < _ > = gossipsub. topics ( ) . collect ( ) ;
176+ if our_topics. is_empty ( ) {
177+ [
178+ & gossipsub_metrics. avg_mesh_peers_per_topic ,
179+ & gossipsub_metrics. max_mesh_peers_per_topic ,
180+ & gossipsub_metrics. min_mesh_peers_per_topic ,
181+ ]
182+ . iter ( )
183+ . for_each ( |metric| metric. set ( 0.0 ) ) ;
184+ } else {
185+ let mesh_counts: Vec < usize > =
186+ our_topics. iter ( ) . map ( |topic| gossipsub. mesh_peers ( topic) . count ( ) ) . collect ( ) ;
187+ let total_mesh = mesh_counts. iter ( ) . sum :: < usize > ( ) ;
188+ let avg_mesh = total_mesh as f64 / our_topics. len ( ) as f64 ;
189+ gossipsub_metrics. avg_mesh_peers_per_topic . set ( avg_mesh) ;
190+
191+ if let ( Some ( & min) , Some ( & max) ) = ( mesh_counts. iter ( ) . min ( ) , mesh_counts. iter ( ) . max ( ) ) {
192+ set_count ( & gossipsub_metrics. min_mesh_peers_per_topic , min) ;
193+ set_count ( & gossipsub_metrics. max_mesh_peers_per_topic , max) ;
194+ }
195+ }
196+
197+ // Peer scoring analysis
198+ let peer_scores: Vec < f64 > =
199+ all_peers. iter ( ) . filter_map ( |( peer_id, _) | gossipsub. peer_score ( peer_id) ) . collect ( ) ;
200+ if peer_scores. is_empty ( ) {
201+ [
202+ & gossipsub_metrics. num_peers_with_positive_score ,
203+ & gossipsub_metrics. num_peers_with_negative_score ,
204+ & gossipsub_metrics. avg_peer_score ,
205+ ]
206+ . iter ( )
207+ . for_each ( |metric| metric. set ( 0.0 ) ) ;
208+ } else {
209+ let positive_count = peer_scores. iter ( ) . filter ( |& & score| score > 0.0 ) . count ( ) ;
210+ let negative_count = peer_scores. iter ( ) . filter ( |& & score| score < 0.0 ) . count ( ) ;
211+ let avg_score = peer_scores. iter ( ) . sum :: < f64 > ( ) / peer_scores. len ( ) as f64 ;
212+
213+ set_count ( & gossipsub_metrics. num_peers_with_positive_score , positive_count) ;
214+ set_count ( & gossipsub_metrics. num_peers_with_negative_score , negative_count) ;
215+ gossipsub_metrics. avg_peer_score . set ( avg_score) ;
216+ }
217+ }
125218}
0 commit comments