@@ -506,4 +506,62 @@ mod tests {
506506 ) ;
507507 } ) ;
508508 }
509+
510+ #[ test]
511+ fn buffer_usage_gauge_clamps_to_zero ( ) {
512+ let recorder = DebuggingRecorder :: default ( ) ;
513+ let snapshotter = recorder. snapshotter ( ) ;
514+ let buffer_id = "test_buffer_clamp" ;
515+ let stage_idx = 0 ;
516+
517+ metrics:: with_local_recorder ( & recorder, || {
518+ let mut usage = BufferUsage :: from_span ( Span :: current ( ) ) ;
519+ let handle = usage. add_stage ( stage_idx) ;
520+
521+ handle. increment_received_event_count_and_byte_size ( 100 , 1000 ) ;
522+ handle. increment_sent_event_count_and_byte_size ( 120 , 1200 ) ; // More sent than received
523+ handle. increment_dropped_event_count_and_byte_size ( 10 , 100 , false ) ;
524+
525+ // calculation:
526+ // events: 100 - 120 - 10 = -30
527+ // bytes: 1000 - 1200 - 100 = -300
528+
529+ BufferUsage :: report_metrics ( & usage. stages , buffer_id) ;
530+
531+ let metrics = snapshotter. snapshot ( ) . into_vec ( ) ;
532+
533+ let events_key = CompositeKey :: new (
534+ MetricKind :: Gauge ,
535+ Key :: from_parts (
536+ "buffer_events" ,
537+ vec ! [
538+ Label :: new( "buffer_id" , buffer_id. to_string( ) ) ,
539+ Label :: new( "stage" , stage_idx. to_string( ) ) ,
540+ ] ,
541+ ) ,
542+ ) ;
543+ let bytes_key = CompositeKey :: new (
544+ MetricKind :: Gauge ,
545+ Key :: from_parts (
546+ "buffer_byte_size" ,
547+ vec ! [
548+ Label :: new( "buffer_id" , buffer_id. to_string( ) ) ,
549+ Label :: new( "stage" , stage_idx. to_string( ) ) ,
550+ ] ,
551+ ) ,
552+ ) ;
553+
554+ let events_gauge = metrics
555+ . iter ( )
556+ . find ( |( key, _, _, _) | key == & events_key)
557+ . unwrap ( ) ;
558+ let bytes_gauge = metrics
559+ . iter ( )
560+ . find ( |( key, _, _, _) | key == & bytes_key)
561+ . unwrap ( ) ;
562+
563+ assert_eq ! ( events_gauge. 3 , DebugValue :: Gauge ( OrderedFloat ( 0.0 ) ) ) ;
564+ assert_eq ! ( bytes_gauge. 3 , DebugValue :: Gauge ( OrderedFloat ( 0.0 ) ) ) ;
565+ } ) ;
566+ }
509567}
0 commit comments