66//!
77//! This module provides types and utilities for collecting runtime statistics
88//! from nodes during pipeline execution. Statistics are throttled to prevent
9- //! overload (typically every 10 seconds or 1000 packets).
9+ //! overload (typically every 2 seconds or 1000 packets).
1010
1111use serde:: { Deserialize , Serialize } ;
1212use std:: time:: SystemTime ;
@@ -35,7 +35,7 @@ impl Default for NodeStats {
3535}
3636
3737/// A statistics update message sent by a node to report its current metrics.
38- /// These updates are throttled to prevent overload (typically every 10s or 1000 packets).
38+ /// These updates are throttled to prevent overload (typically every 2s or 1000 packets).
3939#[ derive( Debug , Clone ) ]
4040pub struct NodeStatsUpdate {
4141 /// The unique identifier of the node reporting the stats
@@ -47,18 +47,19 @@ pub struct NodeStatsUpdate {
4747}
4848
4949/// Helper for tracking and throttling node statistics updates.
50- /// Automatically sends updates every 10 seconds or 1000 packets.
50+ /// Automatically sends updates every 2 seconds or 1000 packets.
5151pub struct NodeStatsTracker {
5252 stats : NodeStats ,
5353 start_time : std:: time:: Instant ,
5454 last_send : std:: time:: Instant ,
55+ has_sent_once : bool ,
5556 node_id : String ,
5657 stats_tx : Option < tokio:: sync:: mpsc:: Sender < NodeStatsUpdate > > ,
5758}
5859
5960impl NodeStatsTracker {
6061 /// Throttling configuration
61- const SEND_INTERVAL : std:: time:: Duration = std:: time:: Duration :: from_secs ( 10 ) ;
62+ const SEND_INTERVAL : std:: time:: Duration = std:: time:: Duration :: from_secs ( 2 ) ;
6263 const SEND_PACKET_THRESHOLD : u64 = 1000 ;
6364
6465 /// Create a new stats tracker for a node
@@ -67,7 +68,14 @@ impl NodeStatsTracker {
6768 stats_tx : Option < tokio:: sync:: mpsc:: Sender < NodeStatsUpdate > > ,
6869 ) -> Self {
6970 let now = std:: time:: Instant :: now ( ) ;
70- Self { stats : NodeStats :: default ( ) , start_time : now, last_send : now, node_id, stats_tx }
71+ Self {
72+ stats : NodeStats :: default ( ) ,
73+ start_time : now,
74+ last_send : now,
75+ has_sent_once : false ,
76+ node_id,
77+ stats_tx,
78+ }
7179 }
7280
7381 /// Record a received packet
@@ -88,23 +96,49 @@ impl NodeStatsTracker {
8896 self . stats . sent += 1 ;
8997 }
9098
99+ /// Record multiple sent packets (for batched stats reporting)
100+ #[ inline]
101+ pub const fn sent_n ( & mut self , count : u64 ) {
102+ self . stats . sent += count;
103+ }
104+
91105 /// Record a discarded packet
92106 #[ inline]
93107 pub const fn discarded ( & mut self ) {
94108 self . stats . discarded += 1 ;
95109 }
96110
111+ /// Record multiple discarded packets (for batched stats reporting)
112+ #[ inline]
113+ pub const fn discarded_n ( & mut self , count : u64 ) {
114+ self . stats . discarded += count;
115+ }
116+
97117 /// Record an error
98118 #[ inline]
99119 pub const fn errored ( & mut self ) {
100120 self . stats . errored += 1 ;
101121 }
102122
103- /// Automatically send stats if threshold is met (every 10s or 1000 packets).
123+ /// Record multiple errors (for batched stats reporting)
124+ #[ inline]
125+ pub const fn errored_n ( & mut self , count : u64 ) {
126+ self . stats . errored += count;
127+ }
128+
129+ /// Automatically send stats if threshold is met (every 2s or 1000 packets).
104130 /// Call this after processing a batch of packets.
105131 pub fn maybe_send ( & mut self ) {
106- let should_send = self . last_send . elapsed ( ) >= Self :: SEND_INTERVAL
107- || self . stats . received . is_multiple_of ( Self :: SEND_PACKET_THRESHOLD ) ;
132+ // Many nodes only increment one side of the counters (e.g. pure sources only `sent`,
133+ // pure sinks only `received`). Use the max to keep the threshold behavior consistent
134+ // across node shapes and avoid the `0.is_multiple_of(..)` pitfall.
135+ let packet_count = self . stats . received . max ( self . stats . sent ) . max ( self . stats . discarded ) ;
136+
137+ // Always emit the first non-empty snapshot promptly so monitoring can "lock on"
138+ // even if the node later blocks under backpressure.
139+ let should_send = ( !self . has_sent_once && packet_count > 0 )
140+ || self . last_send . elapsed ( ) >= Self :: SEND_INTERVAL
141+ || ( packet_count > 0 && packet_count. is_multiple_of ( Self :: SEND_PACKET_THRESHOLD ) ) ;
108142
109143 if should_send {
110144 self . force_send ( ) ;
@@ -123,6 +157,44 @@ impl NodeStatsTracker {
123157 timestamp : SystemTime :: now ( ) ,
124158 } ) ;
125159 self . last_send = std:: time:: Instant :: now ( ) ;
160+ self . has_sent_once = true ;
126161 }
127162 }
128163}
164+
165+ #[ cfg( test) ]
166+ #[ allow( clippy:: unwrap_used) ]
167+ mod tests {
168+ use super :: * ;
169+ use tokio:: sync:: mpsc;
170+
171+ #[ tokio:: test]
172+ async fn maybe_send_does_not_fire_when_empty ( ) {
173+ let ( tx, mut rx) = mpsc:: channel :: < NodeStatsUpdate > ( 10 ) ;
174+ let mut tracker = NodeStatsTracker :: new ( "node" . to_string ( ) , Some ( tx) ) ;
175+
176+ tracker. maybe_send ( ) ;
177+
178+ assert ! ( rx. try_recv( ) . is_err( ) ) ;
179+ }
180+
181+ #[ tokio:: test]
182+ async fn maybe_send_emits_on_first_activity_and_on_threshold ( ) {
183+ let ( tx, mut rx) = mpsc:: channel :: < NodeStatsUpdate > ( 10 ) ;
184+ let mut tracker = NodeStatsTracker :: new ( "node" . to_string ( ) , Some ( tx) ) ;
185+
186+ tracker. sent ( ) ;
187+ tracker. maybe_send ( ) ;
188+ let first = rx. try_recv ( ) . unwrap ( ) ;
189+ assert_eq ! ( first. stats. sent, 1 ) ;
190+
191+ for _ in 1 ..NodeStatsTracker :: SEND_PACKET_THRESHOLD {
192+ tracker. sent ( ) ;
193+ tracker. maybe_send ( ) ;
194+ }
195+
196+ let threshold = rx. try_recv ( ) . unwrap ( ) ;
197+ assert_eq ! ( threshold. node_id, "node" ) ;
198+ assert_eq ! ( threshold. stats. sent, NodeStatsTracker :: SEND_PACKET_THRESHOLD ) ;
199+ }
200+ }
0 commit comments