Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b0703f7
Move `update_and_get` within `update_buffer_gauge`
bruceg Aug 6, 2025
5cff889
Break out counter updating from emitting gauges
bruceg Aug 6, 2025
b8731fd
Break up counter updating and gauge emission
bruceg Aug 6, 2025
69c891f
Move updating buffer counts into buffer usage data module
bruceg Aug 6, 2025
9cc4d35
Use u64 values in the buffer counters map
bruceg Aug 8, 2025
9b41787
Add and subtract in multithread update test
bruceg Aug 8, 2025
f7d343f
Use `update_and_get` to update category metrics atomics as well
bruceg Aug 8, 2025
95a9c77
Add helper function to safely convert u64 to i64
bruceg Aug 8, 2025
0fe85b5
Move current buffer size counters into `BufferUsageData`
bruceg Aug 8, 2025
045dcc8
Rename `update_and_get` to `update_counter` dropping the return
bruceg Aug 8, 2025
482bb5c
Open code `fn emit_counter_gauge` into callers
bruceg Aug 6, 2025
a3a78cb
Add missing `buffer_discarded_bytes_total` counter
bruceg Aug 8, 2025
6ef2d64
Add `buffer_id` to `BufferCreated` event
bruceg Aug 6, 2025
353d0bb
Drop clamping u64 values to "safe" floats
bruceg Aug 8, 2025
376dcde
Add missing `stage` to discarded event counters
bruceg Aug 8, 2025
185622b
Merge branch 'master' into bruceg/rework-buffer-counters
pront Aug 13, 2025
8a26a40
Only do atomic increments if the counts are non-zero
bruceg Aug 13, 2025
b840c9e
Add test to ensure atomic updates obey clamping
bruceg Aug 13, 2025
c5ecfcb
Warn when we overflow or underflow a counter value
bruceg Aug 13, 2025
d04cbf4
Fix increment test values
bruceg Aug 13, 2025
82b22f4
Use "relaxed" ordering for all counter updates
bruceg Aug 13, 2025
de17764
Merge branch 'master' into bruceg/rework-buffer-counters
pront Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions lib/vector-buffers/src/cast_utils.rs

This file was deleted.

21 changes: 12 additions & 9 deletions lib/vector-buffers/src/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Duration;

use crate::cast_utils::u64_to_f64_safe;
use metrics::{counter, gauge, histogram, Histogram};
use vector_common::{
internal_event::{error_type, InternalEvent},
Expand All @@ -15,22 +14,23 @@ pub struct BufferCreated {
}

impl InternalEvent for BufferCreated {
#[expect(clippy::cast_precision_loss)]
fn emit(self) {
if self.max_size_events != 0 {
gauge!(
"buffer_max_event_size",
"buffer_id" => self.buffer_id.clone(),
"stage" => self.idx.to_string(),
)
.set(u64_to_f64_safe(self.max_size_events as u64));
.set(self.max_size_events as f64);
}
if self.max_size_bytes != 0 {
gauge!(
"buffer_max_byte_size",
"buffer_id" => self.buffer_id,
"stage" => self.idx.to_string(),
)
.set(u64_to_f64_safe(self.max_size_bytes));
.set(self.max_size_bytes as f64);
}
}
}
Expand All @@ -45,6 +45,7 @@ pub struct BufferEventsReceived {
}

impl InternalEvent for BufferEventsReceived {
#[expect(clippy::cast_precision_loss)]
fn emit(self) {
counter!(
"buffer_received_events_total",
Expand All @@ -64,13 +65,13 @@ impl InternalEvent for BufferEventsReceived {
"buffer_id" => self.buffer_id.clone(),
"stage" => self.idx.to_string()
)
.set(u64_to_f64_safe(self.total_count));
.set(self.total_count as f64);
gauge!(
"buffer_byte_size",
"buffer_id" => self.buffer_id,
"stage" => self.idx.to_string()
)
.set(u64_to_f64_safe(self.total_byte_size));
.set(self.total_byte_size as f64);
}
}

Expand All @@ -84,6 +85,7 @@ pub struct BufferEventsSent {
}

impl InternalEvent for BufferEventsSent {
#[expect(clippy::cast_precision_loss)]
fn emit(self) {
counter!(
"buffer_sent_events_total",
Expand All @@ -102,13 +104,13 @@ impl InternalEvent for BufferEventsSent {
"buffer_id" => self.buffer_id.clone(),
"stage" => self.idx.to_string()
)
.set(u64_to_f64_safe(self.total_count));
.set(self.total_count as f64);
gauge!(
"buffer_byte_size",
"buffer_id" => self.buffer_id,
"stage" => self.idx.to_string()
)
.set(u64_to_f64_safe(self.total_byte_size));
.set(self.total_byte_size as f64);
}
}

Expand All @@ -124,6 +126,7 @@ pub struct BufferEventsDropped {
}

impl InternalEvent for BufferEventsDropped {
#[expect(clippy::cast_precision_loss)]
fn emit(self) {
let intentional_str = if self.intentional { "true" } else { "false" };
if self.intentional {
Expand Down Expand Up @@ -165,13 +168,13 @@ impl InternalEvent for BufferEventsDropped {
"buffer_id" => self.buffer_id.clone(),
"stage" => self.idx.to_string()
)
.set(u64_to_f64_safe(self.total_count));
.set(self.total_count as f64);
gauge!(
"buffer_byte_size",
"buffer_id" => self.buffer_id,
"stage" => self.idx.to_string()
)
.set(u64_to_f64_safe(self.total_byte_size));
.set(self.total_byte_size as f64);
}
}

Expand Down
1 change: 0 additions & 1 deletion lib/vector-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ mod internal_events;
pub mod test;
pub mod topology;

mod cast_utils;
pub(crate) mod variants;

use std::fmt::Debug;
Expand Down
Loading