Skip to content

Commit f64089a

Browse files
vparfonovpront
andauthored
fix(buffers): Prevent negative buffer size and event gauges (#23453)
* fix(buffers): Prevent negative buffer size and event gauges Signed-off-by: Vitalii Parfonov <vparfono@redhat.com> * add changelog Signed-off-by: Vitalii Parfonov <vparfono@redhat.com> * fix formatting Signed-off-by: Vitalii Parfonov <vparfonov@redhat.com> * Update changelog.d/23453_prevent_negative_buffer_size_and_event_gauges.fix.md Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com> * Handle value capping. Add recorder-based unit tests Signed-off-by: Vitalii Parfonov <vparfono@redhat.com> * Improve cast operation * rename function with better name Signed-off-by: Vitalii Parfonov <vparfono@redhat.com> --------- Signed-off-by: Vitalii Parfonov <vparfono@redhat.com> Signed-off-by: Vitalii Parfonov <vparfonov@redhat.com> Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent c7eaf65 commit f64089a

File tree

6 files changed

+259
-16
lines changed

6 files changed

+259
-16
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed race condition bug causing negative values to be reported by the `vector_buffer_byte_size` and `vector_buffer_events` gauges.
2+
3+
authors: vparfonov

lib/vector-buffers/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ tokio = { version = "1.45.1", default-features = false, features = ["rt", "macro
3232
tracing = { version = "0.1.34", default-features = false, features = ["attributes"] }
3333
vector-config = { path = "../vector-config", default-features = false }
3434
vector-common = { path = "../vector-common", default-features = false, features = ["byte_size_of"] }
35+
dashmap = { version = "6.1", default-features = false }
36+
ordered-float = { version = "4.6.0", default-features = false }
3537

3638
[dev-dependencies]
3739
clap.workspace = true
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Maximum i64 value that can be represented exactly in f64 (2^53).
2+
pub const F64_SAFE_INT_MAX: i64 = 1_i64 << 53;
3+
4+
pub fn i64_to_f64_safe(value: i64) -> f64 {
5+
let capped = value.clamp(0, F64_SAFE_INT_MAX);
6+
debug_assert!(capped <= F64_SAFE_INT_MAX);
7+
#[allow(clippy::cast_precision_loss)]
8+
{
9+
capped as f64
10+
}
11+
}
12+
13+
pub fn u64_to_f64_safe(value: u64) -> f64 {
14+
let capped = value.min(F64_SAFE_INT_MAX as u64);
15+
#[allow(clippy::cast_precision_loss)]
16+
{
17+
capped as f64
18+
}
19+
}

lib/vector-buffers/src/internal_events.rs

Lines changed: 232 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,57 @@
1+
use dashmap::DashMap;
2+
use std::sync::atomic::{AtomicI64, Ordering};
3+
use std::sync::LazyLock;
14
use std::time::Duration;
25

6+
use crate::cast_utils::{i64_to_f64_safe, u64_to_f64_safe};
37
use metrics::{counter, gauge, histogram, Histogram};
48
use vector_common::{
59
internal_event::{error_type, InternalEvent},
610
registered_event,
711
};
812

13+
static BUFFER_COUNTERS: LazyLock<DashMap<usize, (AtomicI64, AtomicI64)>> =
14+
LazyLock::new(DashMap::new);
15+
16+
fn update_and_get(counter: &AtomicI64, delta: i64) -> i64 {
17+
let mut new_val = 0;
18+
counter
19+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
20+
let updated = current.saturating_add(delta).clamp(0, i64::MAX);
21+
new_val = updated;
22+
Some(updated)
23+
})
24+
.ok();
25+
new_val
26+
}
27+
28+
fn update_buffer_gauge(stage: usize, events_delta: i64, bytes_delta: i64) {
29+
let counters = BUFFER_COUNTERS
30+
.entry(stage)
31+
.or_insert_with(|| (AtomicI64::new(0), AtomicI64::new(0)));
32+
33+
let new_events = update_and_get(&counters.0, events_delta);
34+
let new_bytes = update_and_get(&counters.1, bytes_delta);
35+
36+
gauge!("buffer_events", "stage" => stage.to_string()).set(i64_to_f64_safe(new_events));
37+
gauge!("buffer_byte_size", "stage" => stage.to_string()).set(i64_to_f64_safe(new_bytes));
38+
}
39+
940
pub struct BufferCreated {
1041
pub idx: usize,
1142
pub max_size_events: usize,
1243
pub max_size_bytes: u64,
1344
}
1445

1546
impl InternalEvent for BufferCreated {
16-
#[allow(clippy::cast_precision_loss)]
1747
fn emit(self) {
1848
if self.max_size_events != 0 {
1949
gauge!("buffer_max_event_size", "stage" => self.idx.to_string())
20-
.set(self.max_size_events as f64);
50+
.set(u64_to_f64_safe(self.max_size_events as u64));
2151
}
2252
if self.max_size_bytes != 0 {
2353
gauge!("buffer_max_byte_size", "stage" => self.idx.to_string())
24-
.set(self.max_size_bytes as f64);
54+
.set(u64_to_f64_safe(self.max_size_bytes));
2555
}
2656
}
2757
}
@@ -33,15 +63,15 @@ pub struct BufferEventsReceived {
3363
}
3464

3565
impl InternalEvent for BufferEventsReceived {
36-
#[allow(clippy::cast_precision_loss)]
3766
fn emit(self) {
3867
counter!("buffer_received_events_total", "stage" => self.idx.to_string())
3968
.increment(self.count);
4069
counter!("buffer_received_bytes_total", "stage" => self.idx.to_string())
4170
.increment(self.byte_size);
42-
gauge!("buffer_events", "stage" => self.idx.to_string()).increment(self.count as f64);
43-
gauge!("buffer_byte_size", "stage" => self.idx.to_string())
44-
.increment(self.byte_size as f64);
71+
72+
let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
73+
let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
74+
update_buffer_gauge(self.idx, count_delta, bytes_delta);
4575
}
4676
}
4777

@@ -52,14 +82,14 @@ pub struct BufferEventsSent {
5282
}
5383

5484
impl InternalEvent for BufferEventsSent {
55-
#[allow(clippy::cast_precision_loss)]
5685
fn emit(self) {
5786
counter!("buffer_sent_events_total", "stage" => self.idx.to_string()).increment(self.count);
5887
counter!("buffer_sent_bytes_total", "stage" => self.idx.to_string())
5988
.increment(self.byte_size);
60-
gauge!("buffer_events", "stage" => self.idx.to_string()).decrement(self.count as f64);
61-
gauge!("buffer_byte_size", "stage" => self.idx.to_string())
62-
.decrement(self.byte_size as f64);
89+
90+
let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
91+
let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
92+
update_buffer_gauge(self.idx, -count_delta, -bytes_delta);
6393
}
6494
}
6595

@@ -72,7 +102,6 @@ pub struct BufferEventsDropped {
72102
}
73103

74104
impl InternalEvent for BufferEventsDropped {
75-
#[allow(clippy::cast_precision_loss)]
76105
fn emit(self) {
77106
let intentional_str = if self.intentional { "true" } else { "false" };
78107
if self.intentional {
@@ -96,9 +125,10 @@ impl InternalEvent for BufferEventsDropped {
96125
"buffer_discarded_events_total", "intentional" => intentional_str,
97126
)
98127
.increment(self.count);
99-
gauge!("buffer_events", "stage" => self.idx.to_string()).decrement(self.count as f64);
100-
gauge!("buffer_byte_size", "stage" => self.idx.to_string())
101-
.decrement(self.byte_size as f64);
128+
129+
let count_delta = i64::try_from(self.count).unwrap_or(i64::MAX);
130+
let bytes_delta = i64::try_from(self.byte_size).unwrap_or(i64::MAX);
131+
update_buffer_gauge(self.idx, -count_delta, -bytes_delta);
102132
}
103133
}
104134

@@ -115,7 +145,6 @@ impl InternalEvent for BufferReadError {
115145
error_code = self.error_code,
116146
error_type = error_type::READER_FAILED,
117147
stage = "processing",
118-
119148
);
120149
counter!(
121150
"buffer_errors_total", "error_code" => self.error_code,
@@ -137,3 +166,190 @@ registered_event! {
137166
self.send_duration.record(duration);
138167
}
139168
}
169+
170+
#[cfg(test)]
171+
mod tests {
172+
use super::*;
173+
use crate::cast_utils::F64_SAFE_INT_MAX;
174+
use metrics::{Key, Label};
175+
use metrics_util::debugging::{DebugValue, DebuggingRecorder};
176+
use metrics_util::{CompositeKey, MetricKind};
177+
use ordered_float::OrderedFloat;
178+
use std::sync::Mutex;
179+
use std::thread;
180+
181+
static TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
182+
183+
fn reset_counters() {
184+
BUFFER_COUNTERS.clear();
185+
}
186+
187+
fn get_counter_values(stage: usize) -> (i64, i64) {
188+
match BUFFER_COUNTERS.get(&stage) {
189+
Some(counters) => {
190+
let events = counters.0.load(Ordering::Relaxed);
191+
let bytes = counters.1.load(Ordering::Relaxed);
192+
(events, bytes)
193+
}
194+
None => (0, 0),
195+
}
196+
}
197+
198+
fn assert_gauge_state(
199+
stage: usize,
200+
updates: &[(i64, i64)],
201+
expected_events: f64,
202+
expected_bytes: f64,
203+
) {
204+
let _guard = TEST_LOCK.lock().unwrap();
205+
reset_counters();
206+
207+
let recorder = DebuggingRecorder::default();
208+
let snapshotter = recorder.snapshotter();
209+
210+
metrics::with_local_recorder(&recorder, move || {
211+
for (events_delta, bytes_delta) in updates {
212+
update_buffer_gauge(stage, *events_delta, *bytes_delta);
213+
}
214+
215+
let mut metrics = snapshotter.snapshot().into_vec();
216+
217+
let stage_label = Label::new("stage", stage.to_string());
218+
let mut expected_metrics = vec![
219+
(
220+
CompositeKey::new(
221+
MetricKind::Gauge,
222+
Key::from_parts("buffer_events", vec![stage_label.clone()]),
223+
),
224+
None,
225+
None,
226+
DebugValue::Gauge(OrderedFloat(expected_events)),
227+
),
228+
(
229+
CompositeKey::new(
230+
MetricKind::Gauge,
231+
Key::from_parts("buffer_byte_size", vec![stage_label]),
232+
),
233+
None,
234+
None,
235+
DebugValue::Gauge(OrderedFloat(expected_bytes)),
236+
),
237+
];
238+
239+
metrics.sort_by_key(|(key, ..)| key.clone());
240+
expected_metrics.sort_by_key(|(key, ..)| key.clone());
241+
242+
assert_eq!(metrics, expected_metrics);
243+
});
244+
}
245+
246+
#[test]
247+
fn test_increment() {
248+
let _guard = TEST_LOCK.lock().unwrap();
249+
reset_counters();
250+
251+
update_buffer_gauge(0, 10, 1024);
252+
let (events, bytes) = get_counter_values(0);
253+
assert_eq!(events, 10);
254+
assert_eq!(bytes, 1024);
255+
}
256+
257+
#[test]
258+
fn test_increment_and_decrement() {
259+
let _guard = TEST_LOCK.lock().unwrap();
260+
reset_counters();
261+
262+
update_buffer_gauge(1, 100, 2048);
263+
update_buffer_gauge(1, -50, -1024);
264+
let (events, bytes) = get_counter_values(1);
265+
assert_eq!(events, 50);
266+
assert_eq!(bytes, 1024);
267+
}
268+
269+
#[test]
270+
fn test_decrement_below_zero_clamped_to_zero() {
271+
let _guard = TEST_LOCK.lock().unwrap();
272+
reset_counters();
273+
274+
update_buffer_gauge(2, 5, 100);
275+
update_buffer_gauge(2, -10, -200);
276+
let (events, bytes) = get_counter_values(2);
277+
278+
assert_eq!(events, 0);
279+
assert_eq!(bytes, 0);
280+
}
281+
282+
#[test]
283+
fn test_multiple_stages_are_independent() {
284+
let _guard = TEST_LOCK.lock().unwrap();
285+
reset_counters();
286+
287+
update_buffer_gauge(0, 10, 100);
288+
update_buffer_gauge(1, 20, 200);
289+
let (events0, bytes0) = get_counter_values(0);
290+
let (events1, bytes1) = get_counter_values(1);
291+
assert_eq!(events0, 10);
292+
assert_eq!(bytes0, 100);
293+
assert_eq!(events1, 20);
294+
assert_eq!(bytes1, 200);
295+
}
296+
297+
#[test]
298+
fn test_multithreaded_updates_are_correct() {
299+
let _guard = TEST_LOCK.lock().unwrap();
300+
reset_counters();
301+
302+
let num_threads = 10;
303+
let increments_per_thread = 1000;
304+
let mut handles = vec![];
305+
306+
for _ in 0..num_threads {
307+
let handle = thread::spawn(move || {
308+
for _ in 0..increments_per_thread {
309+
update_buffer_gauge(0, 1, 10);
310+
}
311+
});
312+
handles.push(handle);
313+
}
314+
315+
for handle in handles {
316+
handle.join().unwrap();
317+
}
318+
319+
let (final_events, final_bytes) = get_counter_values(0);
320+
let expected_events = i64::from(num_threads * increments_per_thread);
321+
let expected_bytes = i64::from(num_threads * increments_per_thread * 10);
322+
323+
assert_eq!(final_events, expected_events);
324+
assert_eq!(final_bytes, expected_bytes);
325+
}
326+
327+
#[test]
328+
fn test_large_values_capped_to_f64_safe_max() {
329+
let _guard = TEST_LOCK.lock().unwrap();
330+
reset_counters();
331+
332+
update_buffer_gauge(3, F64_SAFE_INT_MAX * 2, F64_SAFE_INT_MAX * 2);
333+
334+
let (events, bytes) = get_counter_values(3);
335+
336+
assert!(events > F64_SAFE_INT_MAX);
337+
assert!(bytes > F64_SAFE_INT_MAX);
338+
339+
let capped_events = events.min(F64_SAFE_INT_MAX);
340+
let capped_bytes = bytes.min(F64_SAFE_INT_MAX);
341+
342+
assert_eq!(capped_events, F64_SAFE_INT_MAX);
343+
assert_eq!(capped_bytes, F64_SAFE_INT_MAX);
344+
}
345+
346+
#[test]
347+
fn test_increment_with_recorder() {
348+
assert_gauge_state(0, &[(100, 2048), (200, 1024)], 300.0, 3072.0);
349+
}
350+
351+
#[test]
352+
fn test_should_not_be_negative_with_recorder() {
353+
assert_gauge_state(0, &[(100, 1024), (-200, -4096)], 0.0, 0.0);
354+
}
355+
}

lib/vector-buffers/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ mod internal_events;
3131
pub mod test;
3232
pub mod topology;
3333

34+
mod cast_utils;
3435
pub(crate) mod variants;
3536

3637
use std::fmt::Debug;

0 commit comments

Comments
 (0)