1- use dashmap:: DashMap ;
2- use std:: sync:: atomic:: { AtomicI64 , Ordering } ;
3- use std:: sync:: LazyLock ;
41use std:: time:: Duration ;
52
6- use crate :: cast_utils:: { i64_to_f64_safe , u64_to_f64_safe} ;
3+ use crate :: cast_utils:: u64_to_f64_safe;
74use metrics:: { counter, gauge, histogram, Histogram } ;
85use vector_common:: {
96 internal_event:: { error_type, InternalEvent } ,
107 registered_event,
118} ;
129
13- static BUFFER_COUNTERS : LazyLock < DashMap < ( String , usize ) , ( AtomicI64 , AtomicI64 ) > > =
14- LazyLock :: new ( DashMap :: new) ;
15-
16- fn update_and_get ( counter : & AtomicI64 , delta : i64 ) -> i64 {
17- let mut new_val = 0 ;
18- counter
19- . fetch_update ( Ordering :: SeqCst , Ordering :: SeqCst , |current| {
20- let updated = current. saturating_add ( delta) . clamp ( 0 , i64:: MAX ) ;
21- new_val = updated;
22- Some ( updated)
23- } )
24- . ok ( ) ;
25- new_val
26- }
27-
28- fn update_buffer_gauge ( buffer_id : & str , stage : usize , events_delta : i64 , bytes_delta : i64 ) {
29- let counters = BUFFER_COUNTERS
30- . entry ( ( buffer_id. to_string ( ) , stage) )
31- . or_insert_with ( || ( AtomicI64 :: new ( 0 ) , AtomicI64 :: new ( 0 ) ) ) ;
32-
33- let new_events = update_and_get ( & counters. 0 , events_delta) ;
34- let new_bytes = update_and_get ( & counters. 1 , bytes_delta) ;
35-
36- gauge ! ( "buffer_events" ,
37- "buffer_id" => buffer_id. to_string( ) ,
38- "stage" => stage. to_string( )
39- )
40- . set ( i64_to_f64_safe ( new_events) ) ;
41-
42- gauge ! ( "buffer_byte_size" ,
43- "buffer_id" => buffer_id. to_string( ) ,
44- "stage" => stage. to_string( )
45- )
46- . set ( i64_to_f64_safe ( new_bytes) ) ;
47- }
48-
4910pub struct BufferCreated {
5011 pub idx : usize ,
5112 pub max_size_events : usize ,
@@ -85,10 +46,6 @@ impl InternalEvent for BufferEventsReceived {
8546 "stage" => self . idx. to_string( )
8647 )
8748 . increment ( self . byte_size ) ;
88-
89- let count_delta = i64:: try_from ( self . count ) . unwrap_or ( i64:: MAX ) ;
90- let bytes_delta = i64:: try_from ( self . byte_size ) . unwrap_or ( i64:: MAX ) ;
91- update_buffer_gauge ( & self . buffer_id , self . idx , count_delta, bytes_delta) ;
9249 }
9350}
9451
@@ -111,10 +68,6 @@ impl InternalEvent for BufferEventsSent {
11168 "buffer_id" => self . buffer_id. clone( ) ,
11269 "stage" => self . idx. to_string( ) )
11370 . increment ( self . byte_size ) ;
114-
115- let count_delta = i64:: try_from ( self . count ) . unwrap_or ( i64:: MAX ) ;
116- let bytes_delta = i64:: try_from ( self . byte_size ) . unwrap_or ( i64:: MAX ) ;
117- update_buffer_gauge ( & self . buffer_id , self . idx , -count_delta, -bytes_delta) ;
11871 }
11972}
12073
@@ -150,17 +103,16 @@ impl InternalEvent for BufferEventsDropped {
150103 ) ;
151104 }
152105
153- counter ! (
154- "buffer_discarded_events_total" ,
155- "buffer_id" => self . buffer_id. clone( ) ,
156- "intentional" => intentional_str,
157- )
158- . increment ( self . count ) ;
106+ let labels = vec ! [
107+ ( "buffer_id" . to_string( ) , self . buffer_id) ,
108+ ( "intentional" . to_string( ) , intentional_str. to_string( ) ) ,
109+ ] ;
159110
160- let count_delta = i64:: try_from ( self . count ) . unwrap_or ( i64:: MAX ) ;
161- let bytes_delta = i64:: try_from ( self . byte_size ) . unwrap_or ( i64:: MAX ) ;
111+ counter ! ( "buffer_discarded_events_total" , & labels) . increment ( self . count ) ;
162112
163- update_buffer_gauge ( & self . buffer_id , self . idx , -count_delta, -bytes_delta) ;
113+ if self . byte_size > 0 {
114+ counter ! ( "buffer_discarded_bytes_total" , & labels) . increment ( self . byte_size ) ;
115+ }
164116 }
165117}
166118
@@ -198,234 +150,3 @@ registered_event! {
198150 self . send_duration. record( duration) ;
199151 }
200152}
201-
202- #[ cfg( test) ]
203- mod tests {
204- use super :: * ;
205- use crate :: cast_utils:: F64_SAFE_INT_MAX ;
206- use metrics:: { Key , Label } ;
207- use metrics_util:: debugging:: { DebugValue , DebuggingRecorder } ;
208- use metrics_util:: { CompositeKey , MetricKind } ;
209- use ordered_float:: OrderedFloat ;
210- use std:: borrow:: Cow ;
211- use std:: sync:: Mutex ;
212- use std:: thread;
213-
214- static TEST_LOCK : LazyLock < Mutex < ( ) > > = LazyLock :: new ( || Mutex :: new ( ( ) ) ) ;
215-
216- fn reset_counters ( ) {
217- BUFFER_COUNTERS . clear ( ) ;
218- }
219-
220- fn get_counter_values ( buffer_id : & str , stage : usize ) -> ( i64 , i64 ) {
221- match BUFFER_COUNTERS . get ( & ( buffer_id. to_string ( ) , stage) ) {
222- Some ( counters) => {
223- let events = counters. 0 . load ( Ordering :: Relaxed ) ;
224- let bytes = counters. 1 . load ( Ordering :: Relaxed ) ;
225- ( events, bytes)
226- }
227- None => ( 0 , 0 ) ,
228- }
229- }
230-
231- fn assert_gauge_state (
232- buffer_id : & str ,
233- stage : usize ,
234- updates : & [ ( i64 , i64 ) ] ,
235- expected_events : f64 ,
236- expected_bytes : f64 ,
237- ) {
238- let _guard = TEST_LOCK
239- . lock ( )
240- . unwrap_or_else ( std:: sync:: PoisonError :: into_inner) ;
241-
242- reset_counters ( ) ;
243-
244- let recorder = DebuggingRecorder :: default ( ) ;
245- let snapshotter = recorder. snapshotter ( ) ;
246-
247- metrics:: with_local_recorder ( & recorder, move || {
248- for ( events_delta, bytes_delta) in updates {
249- update_buffer_gauge ( buffer_id, stage, * events_delta, * bytes_delta) ;
250- }
251-
252- let metrics = snapshotter. snapshot ( ) . into_vec ( ) ;
253-
254- let buffer_id_cow: Cow < ' static , str > = Cow :: Owned ( buffer_id. to_string ( ) ) ;
255- let buffer_id_label = Label :: new ( "buffer_id" , buffer_id_cow) ;
256-
257- let stage_label = Label :: new ( "stage" , stage. to_string ( ) ) ;
258-
259- let expected_metrics = vec ! [
260- (
261- CompositeKey :: new(
262- MetricKind :: Gauge ,
263- Key :: from_parts(
264- "buffer_events" ,
265- vec![ buffer_id_label. clone( ) , stage_label. clone( ) ] ,
266- ) ,
267- ) ,
268- None ,
269- None ,
270- DebugValue :: Gauge ( OrderedFloat ( expected_events) ) ,
271- ) ,
272- (
273- CompositeKey :: new(
274- MetricKind :: Gauge ,
275- Key :: from_parts(
276- "buffer_byte_size" ,
277- vec![ buffer_id_label. clone( ) , stage_label] ,
278- ) ,
279- ) ,
280- None ,
281- None ,
282- DebugValue :: Gauge ( OrderedFloat ( expected_bytes) ) ,
283- ) ,
284- ] ;
285-
286- // Compare metrics without needing to sort if order doesn't matter
287- assert_eq ! ( metrics. len( ) , expected_metrics. len( ) ) ;
288- for expected in & expected_metrics {
289- assert ! (
290- metrics. contains( expected) ,
291- "Missing expected metric: {expected:?}"
292- ) ;
293- }
294- } ) ;
295- }
296-
297- #[ test]
298- fn test_increment ( ) {
299- let _guard = TEST_LOCK
300- . lock ( )
301- . unwrap_or_else ( std:: sync:: PoisonError :: into_inner) ;
302-
303- reset_counters ( ) ;
304-
305- update_buffer_gauge ( "test_buffer" , 0 , 10 , 1024 ) ;
306- let ( events, bytes) = get_counter_values ( "test_buffer" , 0 ) ;
307- assert_eq ! ( events, 10 ) ;
308- assert_eq ! ( bytes, 1024 ) ;
309- }
310-
311- #[ test]
312- fn test_increment_and_decrement ( ) {
313- let _guard = TEST_LOCK . lock ( ) . unwrap ( ) ;
314- reset_counters ( ) ;
315-
316- update_buffer_gauge ( "test_buffer" , 1 , 100 , 2048 ) ;
317- update_buffer_gauge ( "test_buffer" , 1 , -50 , -1024 ) ;
318- let ( events, bytes) = get_counter_values ( "test_buffer" , 1 ) ;
319- assert_eq ! ( events, 50 ) ;
320- assert_eq ! ( bytes, 1024 ) ;
321- }
322-
323- #[ test]
324- fn test_decrement_below_zero_clamped_to_zero ( ) {
325- let _guard = TEST_LOCK . lock ( ) . unwrap ( ) ;
326- reset_counters ( ) ;
327-
328- update_buffer_gauge ( "test_buffer" , 2 , 5 , 100 ) ;
329- update_buffer_gauge ( "test_buffer" , 2 , -10 , -200 ) ;
330- let ( events, bytes) = get_counter_values ( "test_buffer" , 2 ) ;
331-
332- assert_eq ! ( events, 0 ) ;
333- assert_eq ! ( bytes, 0 ) ;
334- }
335-
336- #[ test]
337- fn test_multiple_stages_are_independent ( ) {
338- let _guard = TEST_LOCK . lock ( ) . unwrap ( ) ;
339- reset_counters ( ) ;
340-
341- update_buffer_gauge ( "test_buffer" , 0 , 10 , 100 ) ;
342- update_buffer_gauge ( "test_buffer" , 1 , 20 , 200 ) ;
343- let ( events0, bytes0) = get_counter_values ( "test_buffer" , 0 ) ;
344- let ( events1, bytes1) = get_counter_values ( "test_buffer" , 1 ) ;
345- assert_eq ! ( events0, 10 ) ;
346- assert_eq ! ( bytes0, 100 ) ;
347- assert_eq ! ( events1, 20 ) ;
348- assert_eq ! ( bytes1, 200 ) ;
349- }
350-
351- #[ test]
352- fn test_multithreaded_updates_are_correct ( ) {
353- let _guard = TEST_LOCK
354- . lock ( )
355- . unwrap_or_else ( std:: sync:: PoisonError :: into_inner) ;
356-
357- reset_counters ( ) ;
358-
359- let num_threads = 10 ;
360- let increments_per_thread = 1000 ;
361- let mut handles = vec ! [ ] ;
362-
363- for _ in 0 ..num_threads {
364- let handle = thread:: spawn ( move || {
365- for _ in 0 ..increments_per_thread {
366- update_buffer_gauge ( "test_buffer" , 0 , 1 , 10 ) ;
367- }
368- } ) ;
369- handles. push ( handle) ;
370- }
371-
372- for handle in handles {
373- handle. join ( ) . unwrap ( ) ;
374- }
375-
376- let ( final_events, final_bytes) = get_counter_values ( "test_buffer" , 0 ) ;
377- let expected_events = i64:: from ( num_threads * increments_per_thread) ;
378- let expected_bytes = i64:: from ( num_threads * increments_per_thread * 10 ) ;
379-
380- assert_eq ! ( final_events, expected_events) ;
381- assert_eq ! ( final_bytes, expected_bytes) ;
382- }
383-
384- #[ test]
385- fn test_large_values_capped_to_f64_safe_max ( ) {
386- let _guard = TEST_LOCK
387- . lock ( )
388- . unwrap_or_else ( std:: sync:: PoisonError :: into_inner) ;
389-
390- reset_counters ( ) ;
391-
392- update_buffer_gauge ( "test_buffer" , 3 , F64_SAFE_INT_MAX * 2 , F64_SAFE_INT_MAX * 2 ) ;
393-
394- let ( events, bytes) = get_counter_values ( "test_buffer" , 3 ) ;
395-
396- assert ! ( events > F64_SAFE_INT_MAX ) ;
397- assert ! ( bytes > F64_SAFE_INT_MAX ) ;
398-
399- let capped_events = events. min ( F64_SAFE_INT_MAX ) ;
400- let capped_bytes = bytes. min ( F64_SAFE_INT_MAX ) ;
401-
402- assert_eq ! ( capped_events, F64_SAFE_INT_MAX ) ;
403- assert_eq ! ( capped_bytes, F64_SAFE_INT_MAX ) ;
404- }
405-
406- #[ test]
407- fn test_increment_with_recorder ( ) {
408- assert_gauge_state ( "test_buffer" , 0 , & [ ( 100 , 2048 ) , ( 200 , 1024 ) ] , 300.0 , 3072.0 ) ;
409- }
410-
411- #[ test]
412- fn test_should_not_be_negative_with_recorder ( ) {
413- assert_gauge_state ( "test_buffer" , 1 , & [ ( 100 , 1024 ) , ( -200 , -4096 ) ] , 0.0 , 0.0 ) ;
414- }
415-
416- #[ test]
417- fn test_increment_with_custom_buffer_id ( ) {
418- assert_gauge_state (
419- "buffer_alpha" ,
420- 0 ,
421- & [ ( 100 , 2048 ) , ( 200 , 1024 ) ] ,
422- 300.0 ,
423- 3072.0 ,
424- ) ;
425- }
426-
427- #[ test]
428- fn test_increment_with_another_buffer_id ( ) {
429- assert_gauge_state ( "buffer_beta" , 0 , & [ ( 10 , 100 ) , ( 5 , 50 ) ] , 15.0 , 150.0 ) ;
430- }
431- }
0 commit comments