diff --git a/lib/vector-buffers/src/buffer_usage_data.rs b/lib/vector-buffers/src/buffer_usage_data.rs index ac06d2757da93..90277e8c6328e 100644 --- a/lib/vector-buffers/src/buffer_usage_data.rs +++ b/lib/vector-buffers/src/buffer_usage_data.rs @@ -242,8 +242,10 @@ impl BufferUsage { /// The `buffer_id` should be a unique name -- ideally the `component_id` of the sink using this buffer -- but is /// not used for anything other than reporting, and so has no _requirement_ to be unique. pub fn install(self, buffer_id: &str) { + let buffer_id = buffer_id.to_string(); let span = self.span; let stages = self.stages; + let task_name = format!("buffer usage reporter ({buffer_id})"); let task = async move { let mut interval = interval(Duration::from_secs(2)); @@ -264,6 +266,7 @@ impl BufferUsage { let received = stage.received.consume(); if received.has_updates() { emit(BufferEventsReceived { + buffer_id: buffer_id.clone(), idx: stage.idx, count: received.event_count, byte_size: received.event_byte_size, @@ -273,6 +276,7 @@ impl BufferUsage { let sent = stage.sent.consume(); if sent.has_updates() { emit(BufferEventsSent { + buffer_id: buffer_id.clone(), idx: stage.idx, count: sent.event_count, byte_size: sent.event_byte_size, @@ -282,6 +286,7 @@ impl BufferUsage { let dropped = stage.dropped.consume(); if dropped.has_updates() { emit(BufferEventsDropped { + buffer_id: buffer_id.clone(), idx: stage.idx, intentional: false, reason: "corrupted_events", @@ -293,6 +298,7 @@ impl BufferUsage { let dropped_intentional = stage.dropped_intentional.consume(); if dropped_intentional.has_updates() { emit(BufferEventsDropped { + buffer_id: buffer_id.clone(), idx: stage.idx, intentional: true, reason: "drop_newest", @@ -304,7 +310,6 @@ impl BufferUsage { } }; - let task_name = format!("buffer usage reporter ({buffer_id})"); spawn_named(task.instrument(span.or_current()), task_name.as_str()); } } diff --git a/lib/vector-buffers/src/internal_events.rs b/lib/vector-buffers/src/internal_events.rs index ea024cf186e4d..30e92d39135e4 100644 --- a/lib/vector-buffers/src/internal_events.rs +++ b/lib/vector-buffers/src/internal_events.rs @@ -10,7 +10,7 @@ use vector_common::{ registered_event, }; -static BUFFER_COUNTERS: LazyLock> = +static BUFFER_COUNTERS: LazyLock> = LazyLock::new(DashMap::new); fn update_and_get(counter: &AtomicI64, delta: i64) -> i64 { @@ -25,16 +25,25 @@ fn update_and_get(counter: &AtomicI64, delta: i64) -> i64 { new_val } -fn update_buffer_gauge(stage: usize, events_delta: i64, bytes_delta: i64) { +fn update_buffer_gauge(buffer_id: &str, stage: usize, events_delta: i64, bytes_delta: i64) { let counters = BUFFER_COUNTERS - .entry(stage) + .entry((buffer_id.to_string(), stage)) .or_insert_with(|| (AtomicI64::new(0), AtomicI64::new(0))); let new_events = update_and_get(&counters.0, events_delta); let new_bytes = update_and_get(&counters.1, bytes_delta); - gauge!("buffer_events", "stage" => stage.to_string()).set(i64_to_f64_safe(new_events)); - gauge!("buffer_byte_size", "stage" => stage.to_string()).set(i64_to_f64_safe(new_bytes)); + gauge!("buffer_events", + "buffer_id" => buffer_id.to_string(), + "stage" => stage.to_string() + ) + .set(i64_to_f64_safe(new_events)); + + gauge!("buffer_byte_size", + "buffer_id" => buffer_id.to_string(), + "stage" => stage.to_string() + ) + .set(i64_to_f64_safe(new_bytes)); } pub struct BufferCreated { @@ -57,6 +66,7 @@ impl InternalEvent for BufferCreated { } pub struct BufferEventsReceived { + pub buffer_id: String, pub idx: usize, pub count: u64, pub byte_size: u64, @@ -64,18 +74,26 @@ pub struct BufferEventsReceived { impl InternalEvent for BufferEventsReceived { fn emit(self) { - counter!("buffer_received_events_total", "stage" => self.idx.to_string()) - .increment(self.count); - counter!("buffer_received_bytes_total", "stage" => self.idx.to_string()) - .increment(self.byte_size); + counter!("buffer_received_events_total", + "buffer_id" => self.buffer_id.clone(), + "stage" => self.idx.to_string() + ) + .increment(self.count); + + counter!("buffer_received_bytes_total", + "buffer_id" => self.buffer_id.clone(), + "stage" => self.idx.to_string() + ) + .increment(self.byte_size); let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX); let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX); - update_buffer_gauge(self.idx, count_delta, bytes_delta); + update_buffer_gauge(&self.buffer_id, self.idx, count_delta, bytes_delta); } } pub struct BufferEventsSent { + pub buffer_id: String, pub idx: usize, pub count: u64, pub byte_size: u64, @@ -83,17 +101,25 @@ pub struct BufferEventsSent { impl InternalEvent for BufferEventsSent { fn emit(self) { - counter!("buffer_sent_events_total", "stage" => self.idx.to_string()).increment(self.count); - counter!("buffer_sent_bytes_total", "stage" => self.idx.to_string()) - .increment(self.byte_size); + counter!("buffer_sent_events_total", + "buffer_id" => self.buffer_id.clone(), + "stage" => self.idx.to_string() + ) + .increment(self.count); + + counter!("buffer_sent_bytes_total", + "buffer_id" => self.buffer_id.clone(), + "stage" => self.idx.to_string()) + .increment(self.byte_size); let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX); let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX); - update_buffer_gauge(self.idx, -count_delta, -bytes_delta); + update_buffer_gauge(&self.buffer_id, self.idx, -count_delta, -bytes_delta); } } pub struct BufferEventsDropped { + pub buffer_id: String, pub idx: usize, pub count: u64, pub byte_size: u64, @@ -110,6 +136,7 @@ impl InternalEvent for BufferEventsDropped { count = %self.count, intentional = %intentional_str, reason = %self.reason, + buffer_id = %self.buffer_id, stage = %self.idx, ); } else { @@ -118,17 +145,22 @@ impl InternalEvent for BufferEventsDropped { count = %self.count, intentional = %intentional_str, reason = %self.reason, + buffer_id = %self.buffer_id, stage = %self.idx, ); } + counter!( - "buffer_discarded_events_total", "intentional" => intentional_str, + "buffer_discarded_events_total", + "buffer_id" => self.buffer_id.clone(), + "intentional" => intentional_str, ) .increment(self.count); let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX); let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX); - update_buffer_gauge(self.idx, -count_delta, -bytes_delta); + + update_buffer_gauge(&self.buffer_id, self.idx, -count_delta, -bytes_delta); } } @@ -175,6 +207,7 @@ mod tests { use metrics_util::debugging::{DebugValue, DebuggingRecorder}; use metrics_util::{CompositeKey, MetricKind}; use ordered_float::OrderedFloat; + use std::borrow::Cow; use std::sync::Mutex; use std::thread; @@ -184,8 +217,8 @@ mod tests { BUFFER_COUNTERS.clear(); } - fn get_counter_values(stage: usize) -> (i64, i64) { - match BUFFER_COUNTERS.get(&stage) { + fn get_counter_values(buffer_id: &str, stage: usize) -> (i64, i64) { + match BUFFER_COUNTERS.get(&(buffer_id.to_string(), stage)) { Some(counters) => { let events = counters.0.load(Ordering::Relaxed); let bytes = counters.1.load(Ordering::Relaxed); @@ -196,12 +229,16 @@ mod tests { } fn assert_gauge_state( + buffer_id: &str, stage: usize, updates: &[(i64, i64)], expected_events: f64, expected_bytes: f64, ) { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + reset_counters(); let recorder = DebuggingRecorder::default(); @@ -209,17 +246,24 @@ mod tests { metrics::with_local_recorder(&recorder, move || { for (events_delta, bytes_delta) in updates { - update_buffer_gauge(stage, *events_delta, *bytes_delta); + update_buffer_gauge(buffer_id, stage, *events_delta, *bytes_delta); } - let mut metrics = snapshotter.snapshot().into_vec(); + let metrics = snapshotter.snapshot().into_vec(); + + let buffer_id_cow: Cow<'static, str> = Cow::Owned(buffer_id.to_string()); + let buffer_id_label = Label::new("buffer_id", buffer_id_cow); let stage_label = Label::new("stage", stage.to_string()); - let mut expected_metrics = vec![ + + let expected_metrics = vec![ ( CompositeKey::new( MetricKind::Gauge, - Key::from_parts("buffer_events", vec![stage_label.clone()]), + Key::from_parts( + "buffer_events", + vec![buffer_id_label.clone(), stage_label.clone()], + ), ), None, None, @@ -228,7 +272,10 @@ mod tests { ( CompositeKey::new( MetricKind::Gauge, - Key::from_parts("buffer_byte_size", vec![stage_label]), + Key::from_parts( + "buffer_byte_size", + vec![buffer_id_label.clone(), stage_label], + ), ), None, None, @@ -236,20 +283,27 @@ mod tests { ), ]; - metrics.sort_by_key(|(key, ..)| key.clone()); - expected_metrics.sort_by_key(|(key, ..)| key.clone()); - - assert_eq!(metrics, expected_metrics); + // Compare metrics without needing to sort if order doesn't matter + assert_eq!(metrics.len(), expected_metrics.len()); + for expected in &expected_metrics { + assert!( + metrics.contains(expected), + "Missing expected metric: {expected:?}" + ); + } }); } #[test] fn test_increment() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + reset_counters(); - update_buffer_gauge(0, 10, 1024); - let (events, bytes) = get_counter_values(0); + update_buffer_gauge("test_buffer", 0, 10, 1024); + let (events, bytes) = get_counter_values("test_buffer", 0); assert_eq!(events, 10); assert_eq!(bytes, 1024); } @@ -259,9 +313,9 @@ mod tests { let _guard = TEST_LOCK.lock().unwrap(); reset_counters(); - update_buffer_gauge(1, 100, 2048); - update_buffer_gauge(1, -50, -1024); - let (events, bytes) = get_counter_values(1); + update_buffer_gauge("test_buffer", 1, 100, 2048); + update_buffer_gauge("test_buffer", 1, -50, -1024); + let (events, bytes) = get_counter_values("test_buffer", 1); assert_eq!(events, 50); assert_eq!(bytes, 1024); } @@ -271,9 +325,9 @@ mod tests { let _guard = TEST_LOCK.lock().unwrap(); reset_counters(); - update_buffer_gauge(2, 5, 100); - update_buffer_gauge(2, -10, -200); - let (events, bytes) = get_counter_values(2); + update_buffer_gauge("test_buffer", 2, 5, 100); + update_buffer_gauge("test_buffer", 2, -10, -200); + let (events, bytes) = get_counter_values("test_buffer", 2); assert_eq!(events, 0); assert_eq!(bytes, 0); @@ -284,10 +338,10 @@ mod tests { let _guard = TEST_LOCK.lock().unwrap(); reset_counters(); - update_buffer_gauge(0, 10, 100); - update_buffer_gauge(1, 20, 200); - let (events0, bytes0) = get_counter_values(0); - let (events1, bytes1) = get_counter_values(1); + update_buffer_gauge("test_buffer", 0, 10, 100); + update_buffer_gauge("test_buffer", 1, 20, 200); + let (events0, bytes0) = get_counter_values("test_buffer", 0); + let (events1, bytes1) = get_counter_values("test_buffer", 1); assert_eq!(events0, 10); assert_eq!(bytes0, 100); assert_eq!(events1, 20); @@ -296,7 +350,10 @@ mod tests { #[test] fn test_multithreaded_updates_are_correct() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + reset_counters(); let num_threads = 10; @@ -306,7 +363,7 @@ mod tests { for _ in 0..num_threads { let handle = thread::spawn(move || { for _ in 0..increments_per_thread { - update_buffer_gauge(0, 1, 10); + update_buffer_gauge("test_buffer", 0, 1, 10); } }); handles.push(handle); @@ -316,7 +373,7 @@ mod tests { handle.join().unwrap(); } - let (final_events, final_bytes) = get_counter_values(0); + let (final_events, final_bytes) = get_counter_values("test_buffer", 0); let expected_events = i64::from(num_threads * increments_per_thread); let expected_bytes = i64::from(num_threads * increments_per_thread * 10); @@ -326,12 +383,15 @@ mod tests { #[test] fn test_large_values_capped_to_f64_safe_max() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + reset_counters(); - update_buffer_gauge(3, F64_SAFE_INT_MAX * 2, F64_SAFE_INT_MAX * 2); + update_buffer_gauge("test_buffer", 3, F64_SAFE_INT_MAX * 2, F64_SAFE_INT_MAX * 2); - let (events, bytes) = get_counter_values(3); + let (events, bytes) = get_counter_values("test_buffer", 3); assert!(events > F64_SAFE_INT_MAX); assert!(bytes > F64_SAFE_INT_MAX); @@ -345,11 +405,27 @@ mod tests { #[test] fn test_increment_with_recorder() { - assert_gauge_state(0, &[(100, 2048), (200, 1024)], 300.0, 3072.0); + assert_gauge_state("test_buffer", 0, &[(100, 2048), (200, 1024)], 300.0, 3072.0); } #[test] fn test_should_not_be_negative_with_recorder() { - assert_gauge_state(0, &[(100, 1024), (-200, -4096)], 0.0, 0.0); + assert_gauge_state("test_buffer", 1, &[(100, 1024), (-200, -4096)], 0.0, 0.0); + } + + #[test] + fn test_increment_with_custom_buffer_id() { + assert_gauge_state( + "buffer_alpha", + 0, + &[(100, 2048), (200, 1024)], + 300.0, + 3072.0, + ); + } + + #[test] + fn test_increment_with_another_buffer_id() { + assert_gauge_state("buffer_beta", 0, &[(10, 100), (5, 50)], 15.0, 150.0); } }