@@ -10,13 +10,14 @@ use pipeline::NetworkFunction;
1010
1111use concurrency:: sync:: Arc ;
1212use kanal:: ReceiveError ;
13- use std:: collections:: { HashSet , VecDeque } ;
13+ use std:: collections:: { HashMap , HashSet , VecDeque } ;
1414use std:: time:: { Duration , Instant } ;
1515use vpcmap:: VpcDiscriminant ;
1616use vpcmap:: map:: VpcMapReader ;
1717
1818use crate :: vpc_stats:: VpcStatsStore ;
19- use crate :: { RegisteredVpcMetrics , Specification , VpcMetricsSpec } ;
19+ use crate :: { MetricSpec , Register , RegisteredVpcMetrics , Specification , VpcMetricsSpec } ;
20+ use metrics:: Unit ;
2021use net:: buffer:: PacketBufferMut ;
2122use net:: packet:: DoneReason ;
2223use rand:: RngCore ;
@@ -63,6 +64,42 @@ fn snapshot_vpc_pairs(reader: &VpcMapReader<VpcMapName>) -> Vec<(VpcDiscriminant
6364 }
6465}
6566
67+ #[ inline]
68+ fn vpc_id_packet_count ( ) -> & ' static str {
69+ "vpc_packet_count"
70+ }
71+ #[ inline]
72+ fn vpc_id_packet_rate ( ) -> & ' static str {
73+ "vpc_packet_rate"
74+ }
75+ #[ inline]
76+ fn vpc_id_byte_count ( ) -> & ' static str {
77+ "vpc_byte_count"
78+ }
79+ #[ inline]
80+ fn vpc_id_byte_rate ( ) -> & ' static str {
81+ "vpc_byte_rate"
82+ }
83+
84+ #[ inline]
85+ fn set_vpc_gauges_to_zero ( labels : Vec < ( String , String ) > ) {
86+ let packet_count: crate :: register:: Registered < metrics:: Gauge > =
87+ MetricSpec :: new ( vpc_id_packet_count ( ) , Unit :: Count , labels. clone ( ) ) . register ( ) ;
88+ packet_count. metric . set ( 0.0 ) ;
89+
90+ let packet_rate: crate :: register:: Registered < metrics:: Gauge > =
91+ MetricSpec :: new ( vpc_id_packet_rate ( ) , Unit :: BitsPerSecond , labels. clone ( ) ) . register ( ) ;
92+ packet_rate. metric . set ( 0.0 ) ;
93+
94+ let byte_count: crate :: register:: Registered < metrics:: Gauge > =
95+ MetricSpec :: new ( vpc_id_byte_count ( ) , Unit :: Count , labels. clone ( ) ) . register ( ) ;
96+ byte_count. metric . set ( 0.0 ) ;
97+
98+ let byte_rate: crate :: register:: Registered < metrics:: Gauge > =
99+ MetricSpec :: new ( vpc_id_byte_rate ( ) , Unit :: BitsPerSecond , labels) . register ( ) ;
100+ byte_rate. metric . set ( 0.0 ) ;
101+ }
102+
66103/// A `StatsCollector` is responsible for collecting and aggregating packet statistics for a
67104/// collection of workers running packet processing pipelines on various threads.
68105#[ derive( Debug ) ]
@@ -83,6 +120,9 @@ pub struct StatsCollector {
83120 /// Shared store for snapshots/rates usable by gRPC, CLI, etc.
84121 vpc_store : Arc < VpcStatsStore > ,
85122 alive_vpcs : HashSet < VpcDiscriminant > ,
123+ /// `known` is a reference to the previous snapshot of `alive` VPCs, used to detect removals.
124+ known_vpcs : HashSet < VpcDiscriminant > ,
125+ known_names : HashMap < VpcDiscriminant , String > ,
86126}
87127
88128impl StatsCollector {
@@ -127,11 +167,17 @@ impl StatsCollector {
127167 } ;
128168
129169 let name_pairs = snapshot_vpc_pairs ( & vpcmap_r) ;
130- vpc_store. set_many_vpc_names_sync ( name_pairs) ;
170+ vpc_store. set_many_vpc_names_sync ( name_pairs. clone ( ) ) ;
131171
132172 let alive_vpcs: HashSet < VpcDiscriminant > =
133173 vpc_data. iter ( ) . map ( |( disc, _, _) | * disc) . collect ( ) ;
134174
175+ let mut known_names: HashMap < VpcDiscriminant , String > = HashMap :: new ( ) ;
176+ for ( disc, name) in name_pairs {
177+ known_names. insert ( disc, name) ;
178+ }
179+ let known_vpcs = alive_vpcs. clone ( ) ;
180+
135181 let metrics = VpcMetricsSpec :: new ( vpc_data)
136182 . into_iter ( )
137183 . map ( |( disc, spec) | ( disc, spec. build ( ) ) )
@@ -155,6 +201,8 @@ impl StatsCollector {
155201 updates,
156202 vpc_store,
157203 alive_vpcs,
204+ known_vpcs,
205+ known_names,
158206 } ;
159207 let writer = PacketStatsWriter ( s) ;
160208 ( stats, writer, store_clone)
@@ -182,10 +230,54 @@ impl StatsCollector {
182230 let pairs = snapshot_vpc_pairs ( & self . vpcmap_r ) ;
183231 self . vpc_store . set_many_vpc_names_sync ( pairs. clone ( ) ) ;
184232
185- self . alive_vpcs = pairs. iter ( ) . map ( |( d, _) | * d) . collect ( ) ;
233+ let new_alive: HashSet < VpcDiscriminant > = pairs. iter ( ) . map ( |( d, _) | * d) . collect ( ) ;
234+
235+ let mut removed: Vec < VpcDiscriminant > =
236+ self . known_vpcs . difference ( & new_alive) . copied ( ) . collect ( ) ;
237+ removed. sort ( ) ;
238+
239+ for ( disc, name) in & pairs {
240+ self . known_names . insert ( * disc, name. clone ( ) ) ;
241+ }
242+
243+ self . alive_vpcs = new_alive. clone ( ) ;
186244
187245 // prune any removed VPCs / pairs so they do not show up in snapshots/status
188246 self . vpc_store . prune_to_vpcs ( & self . alive_vpcs ) . await ;
247+
248+ if !removed. is_empty ( ) {
249+ let mut alive_names: Vec < String > = pairs. iter ( ) . map ( |( _, n) | n. clone ( ) ) . collect ( ) ;
250+ alive_names. sort ( ) ;
251+ alive_names. dedup ( ) ;
252+
253+ for disc in removed {
254+ let removed_name = self
255+ . known_names
256+ . get ( & disc)
257+ . cloned ( )
258+ . unwrap_or_else ( || format ! ( "{disc:?}" ) ) ;
259+
260+ // total/drops series for the removed VPC
261+ set_vpc_gauges_to_zero ( vec ! [ ( "total" . to_string( ) , removed_name. clone( ) ) ] ) ;
262+ set_vpc_gauges_to_zero ( vec ! [ ( "drops" . to_string( ) , removed_name. clone( ) ) ] ) ;
263+
264+ // peering series in both directions
265+ for other_name in & alive_names {
266+ set_vpc_gauges_to_zero ( vec ! [
267+ ( "from" . to_string( ) , removed_name. clone( ) ) ,
268+ ( "to" . to_string( ) , other_name. clone( ) ) ,
269+ ] ) ;
270+ set_vpc_gauges_to_zero ( vec ! [
271+ ( "from" . to_string( ) , other_name. clone( ) ) ,
272+ ( "to" . to_string( ) , removed_name. clone( ) ) ,
273+ ] ) ;
274+ }
275+
276+ self . known_names . remove ( & disc) ;
277+ }
278+ }
279+
280+ self . known_vpcs = new_alive;
189281 }
190282
191283 /// Run the collector (async). Does not return if awaited.
@@ -364,42 +456,6 @@ impl StatsCollector {
364456 start, duration, capacity,
365457 ) ) ;
366458
367- // Update raw packet/byte COUNTS for "total" metrics (monotonic counters)
368- concluded. vpc . iter ( ) . for_each ( |( & src, tx_summary) | {
369- let metrics = match self . metrics . get ( & src) {
370- None => {
371- warn ! ( "lost metrics for src {src}" ) ;
372- return ;
373- }
374- Some ( metrics) => metrics,
375- } ;
376- tx_summary
377- . dst
378- . iter ( )
379- . for_each ( |( & dst, & stats) | match metrics. peering . get ( & dst) {
380- None => {
381- warn ! ( "lost metrics for src {src} to dst {dst}" ) ;
382- }
383- Some ( action) => {
384- action. tx . packet . count . metric . increment ( stats. packets ) ;
385- action. tx . byte . count . metric . increment ( stats. bytes ) ;
386- }
387- } ) ;
388- let action = & metrics. drops ;
389- action
390- . tx
391- . packet
392- . count
393- . metric
394- . increment ( tx_summary. drops . packets ) ;
395- action
396- . tx
397- . byte
398- . count
399- . metric
400- . increment ( tx_summary. drops . bytes ) ;
401- } ) ;
402-
403459 // Mirror counters into the store (monotonic)
404460 for ( & src, tx_summary) in & concluded. vpc {
405461 if !self . alive_vpcs . contains ( & src) {
@@ -439,6 +495,52 @@ impl StatsCollector {
439495 // Push this *apportioned per-batch* snapshot into the SG window.
440496 self . submitted . push ( concluded. vpc . clone ( ) ) ;
441497
498+ // Refresh count gauges from the store (so reuse doesn't carry stale totals).
499+ let pair_snap = self . vpc_store . snapshot_pairs ( ) . await ;
500+ for ( ( src, dst) , fs) in pair_snap {
501+ if !self . alive_vpcs . contains ( & src) || !self . alive_vpcs . contains ( & dst) {
502+ continue ;
503+ }
504+ if let Some ( metrics) = self . metrics . get ( & src)
505+ && let Some ( action) = metrics. peering . get ( & dst)
506+ {
507+ action. tx . packet . count . metric . set ( fs. ctr . packets as f64 ) ;
508+ action. tx . byte . count . metric . set ( fs. ctr . bytes as f64 ) ;
509+ }
510+ }
511+
512+ let vpc_snap = self . vpc_store . snapshot_vpcs ( ) . await ;
513+ for ( src, fs) in vpc_snap {
514+ if !self . alive_vpcs . contains ( & src) {
515+ continue ;
516+ }
517+ if let Some ( metrics) = self . metrics . get ( & src) {
518+ metrics
519+ . total
520+ . tx
521+ . packet
522+ . count
523+ . metric
524+ . set ( fs. ctr . packets as f64 ) ;
525+ metrics. total . tx . byte . count . metric . set ( fs. ctr . bytes as f64 ) ;
526+
527+ metrics
528+ . drops
529+ . tx
530+ . packet
531+ . count
532+ . metric
533+ . set ( fs. drops . packets as f64 ) ;
534+ metrics
535+ . drops
536+ . tx
537+ . byte
538+ . count
539+ . metric
540+ . set ( fs. drops . bytes as f64 ) ;
541+ }
542+ }
543+
442544 // Build per-source filters and smooth.
443545 let filters_by_src: hashbrown:: HashMap <
444546 VpcDiscriminant ,
@@ -487,6 +589,10 @@ impl StatsCollector {
487589 total_bps += bps;
488590 }
489591
592+ // total per-vpc rates
593+ metrics. total . tx . packet . rate . metric . set ( total_pps) ;
594+ metrics. total . tx . byte . rate . metric . set ( total_bps) ;
595+
490596 self . vpc_store
491597 . set_vpc_rates ( src, total_pps, total_bps)
492598 . await ;
0 commit comments