Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b0703f7
Move `update_and_get` within `update_buffer_gauge`
bruceg Aug 6, 2025
5cff889
Break out counter updating from emitting gauges
bruceg Aug 6, 2025
b8731fd
Break up counter updating and gauge emission
bruceg Aug 6, 2025
69c891f
Move updating buffer counts into buffer usage data module
bruceg Aug 6, 2025
9cc4d35
Use u64 values in the buffer counters map
bruceg Aug 8, 2025
9b41787
Add and subtract in multithread update test
bruceg Aug 8, 2025
f7d343f
Use `update_and_get` to update category metrics atomics as well
bruceg Aug 8, 2025
95a9c77
Add helper function to safely convert u64 to i64
bruceg Aug 8, 2025
0fe85b5
Move current buffer size counters into `BufferUsageData`
bruceg Aug 8, 2025
045dcc8
Rename `update_and_get` to `update_counter` dropping the return
bruceg Aug 8, 2025
482bb5c
Open code `fn emit_counter_gauge` into callers
bruceg Aug 6, 2025
a3a78cb
Add missing `buffer_discarded_bytes_total` counter
bruceg Aug 8, 2025
6ef2d64
Add `buffer_id` to `BufferCreated` event
bruceg Aug 6, 2025
353d0bb
Drop clamping u64 values to "safe" floats
bruceg Aug 8, 2025
376dcde
Add missing `stage` to discarded event counters
bruceg Aug 8, 2025
185622b
Merge branch 'master' into bruceg/rework-buffer-counters
pront Aug 13, 2025
8a26a40
Only do atomic increments if the counts are non-zero
bruceg Aug 13, 2025
b840c9e
Add test to ensure atomic updates obey clamping
bruceg Aug 13, 2025
c5ecfcb
Warn when we overflow or underflow a counter value
bruceg Aug 13, 2025
d04cbf4
Fix increment test values
bruceg Aug 13, 2025
82b22f4
Use "relaxed" ordering for all counter updates
bruceg Aug 13, 2025
de17764
Merge branch 'master' into bruceg/rework-buffer-counters
pront Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions lib/vector-buffers/src/buffer_usage_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@ use crate::{
spawn_named,
};

fn u64_to_i64(value: u64) -> i64 {
// If we ever have to deal with more than 9,223,372,036,854,775,808 events or bytes _in a single
// update_, this conversion will break counters. That number is effectively impossibly large,
// though.
i64::try_from(value).unwrap_or(i64::MAX)
}

fn update_counter(counter: &AtomicU64, delta: i64) {
counter
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
// This will never be negative due to the `clamp`
#[expect(clippy::cast_sign_loss)]
{
Some(u64_to_i64(current).saturating_add(delta).clamp(0, i64::MAX) as u64)
}
})
.ok();
}

/// Snapshot of category metrics.
struct CategorySnapshot {
event_count: u64,
Expand Down Expand Up @@ -43,9 +62,14 @@ struct CategoryMetrics {
impl CategoryMetrics {
/// Increments the event count and byte size by the given amounts.
fn increment(&self, event_count: u64, event_byte_size: u64) {
self.event_count.fetch_add(event_count, Ordering::Relaxed);
self.event_byte_size
.fetch_add(event_byte_size, Ordering::Relaxed);
update_counter(&self.event_count, u64_to_i64(event_count));
update_counter(&self.event_byte_size, u64_to_i64(event_byte_size));
}

/// Decrements the event count and byte size by the given amounts.
fn decrement(&self, event_count: u64, event_byte_size: u64) {
update_counter(&self.event_count, -u64_to_i64(event_count));
update_counter(&self.event_byte_size, -u64_to_i64(event_byte_size));
}

/// Sets the event count and event byte size to the given amount.
Expand Down Expand Up @@ -118,13 +142,15 @@ impl BufferUsageHandle {
/// This represents the events being sent into the buffer.
pub fn increment_received_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
self.state.received.increment(count, byte_size);
self.state.current.increment(count, byte_size);
}

/// Increments the number of events (and their total size) sent by this buffer component.
///
/// This represents the events being read out of the buffer.
pub fn increment_sent_event_count_and_byte_size(&self, count: u64, byte_size: u64) {
self.state.sent.increment(count, byte_size);
self.state.current.decrement(count, byte_size);
}

/// Increment the number of dropped events (and their total size) for this buffer component.
Expand All @@ -139,6 +165,7 @@ impl BufferUsageHandle {
} else {
self.state.dropped.increment(count, byte_size);
}
self.state.current.decrement(count, byte_size);
}
}

Expand All @@ -150,6 +177,7 @@ struct BufferUsageData {
dropped: CategoryMetrics,
dropped_intentional: CategoryMetrics,
max_size: CategoryMetrics,
current: CategoryMetrics,
}

impl BufferUsageData {
Expand Down Expand Up @@ -255,6 +283,7 @@ impl BufferUsage {
for stage in &stages {
let max_size = stage.max_size.get();
emit(BufferCreated {
buffer_id: buffer_id.clone(),
idx: stage.idx,
max_size_bytes: max_size.event_byte_size,
max_size_events: max_size
Expand All @@ -263,13 +292,16 @@ impl BufferUsage {
.expect("should never be bigger than `usize`"),
});

let current = stage.current.get();
let received = stage.received.consume();
if received.has_updates() {
emit(BufferEventsReceived {
buffer_id: buffer_id.clone(),
idx: stage.idx,
count: received.event_count,
byte_size: received.event_byte_size,
total_count: current.event_count,
total_byte_size: current.event_byte_size,
});
}

Expand All @@ -280,6 +312,8 @@ impl BufferUsage {
idx: stage.idx,
count: sent.event_count,
byte_size: sent.event_byte_size,
total_count: current.event_count,
total_byte_size: current.event_byte_size,
});
}

Expand All @@ -292,6 +326,8 @@ impl BufferUsage {
reason: "corrupted_events",
count: dropped.event_count,
byte_size: dropped.event_byte_size,
total_count: current.event_count,
total_byte_size: current.event_byte_size,
});
}

Expand All @@ -304,6 +340,8 @@ impl BufferUsage {
reason: "drop_newest",
count: dropped_intentional.event_count,
byte_size: dropped_intentional.event_byte_size,
total_count: current.event_count,
total_byte_size: current.event_byte_size,
});
}
}
Expand All @@ -313,3 +351,36 @@ impl BufferUsage {
spawn_named(task.instrument(span.or_current()), task_name.as_str());
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::thread;

#[test]
fn test_multithreaded_updates_are_correct() {
const NUM_THREADS: u64 = 16;
const INCREMENTS_PER_THREAD: u64 = 10_000;

let counter = Arc::new(AtomicU64::new(0));

let mut handles = vec![];

for _ in 0..NUM_THREADS {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..INCREMENTS_PER_THREAD {
update_counter(&counter, 1);
update_counter(&counter, -1);
}
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

assert_eq!(counter.load(Ordering::Relaxed), 0);
}
}
19 changes: 0 additions & 19 deletions lib/vector-buffers/src/cast_utils.rs

This file was deleted.

Loading
Loading