Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 106 additions & 2 deletions metrics-util/src/storage/reservoir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn fastrand(upper: usize) -> usize {
struct Reservoir {
values: Box<[AtomicU64]>,
count: AtomicUsize,
unsampled_sum: AtomicU64,
}

impl Reservoir {
Expand All @@ -42,7 +43,11 @@ impl Reservoir {
values.push(AtomicU64::new(0));
}

Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) }
Self {
values: values.into_boxed_slice(),
count: AtomicUsize::new(0),
unsampled_sum: AtomicU64::new(0.0f64.to_bits()),
}
}

fn push(&self, value: f64) {
Expand All @@ -55,12 +60,22 @@ impl Reservoir {
self.values[maybe_idx].store(value.to_bits(), Relaxed);
}
}

loop {
let result = self.unsampled_sum.fetch_update(Relaxed, Relaxed, |curr| {
Some((f64::from_bits(curr) + value).to_bits())
});
if result.is_ok() {
break;
}
}
}

fn drain(&self) -> Drain<'_> {
let unsampled_len = self.count.load(Relaxed);
let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len };
Drain { reservoir: self, unsampled_len, len, idx: 0 }
let unsampled_sum = f64::from_bits(self.unsampled_sum.load(Relaxed));
Drain { reservoir: self, unsampled_len, len, idx: 0, unsampled_sum }
}
}

Expand All @@ -70,9 +85,16 @@ pub struct Drain<'a> {
unsampled_len: usize,
len: usize,
idx: usize,
unsampled_sum: f64,
}

impl<'a> Drain<'a> {
/// Returns the total number of samples pushed into the reservoir,
/// including those that were dropped by the sampling algorithm.
pub fn unsampled_len(&self) -> usize {
self.unsampled_len
}

/// Returns the sample rate of the reservoir that produced this iterator.
///
/// The sample rate is the ratio of the number of samples pushed into the reservoir to the number of samples held in
Expand All @@ -89,6 +111,12 @@ impl<'a> Drain<'a> {
self.len as f64 / self.unsampled_len as f64
}
}

/// Returns the sum of all samples pushed into the reservoir,
/// including those that were dropped by the sampling algorithm.
pub fn unsampled_sum(&self) -> f64 {
self.unsampled_sum
}
}

impl<'a> Iterator for Drain<'a> {
Expand All @@ -114,6 +142,7 @@ impl ExactSizeIterator for Drain<'_> {
impl<'a> Drop for Drain<'a> {
fn drop(&mut self) {
self.reservoir.count.store(0, Release);
self.reservoir.unsampled_sum.store(0.0f64.to_bits(), Release);
}
}

Expand Down Expand Up @@ -185,3 +214,78 @@ impl AtomicSamplingReservoir {
f(drain);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_under_capacity_no_overflow() {
let reservoir = AtomicSamplingReservoir::new(64);

for i in 0..10 {
reservoir.push(i as f64);
}

reservoir.consume(|drain| {
assert_eq!(drain.unsampled_len(), 10);
assert_eq!(drain.unsampled_sum(), 45.0);
assert_eq!(drain.sample_rate(), 1.0);

let collected: Vec<f64> = drain.collect();
assert_eq!(collected.len(), 10);
});
}

#[test]
fn test_overflow_unsampled_len_and_sum_are_true_values() {
let reservoir = AtomicSamplingReservoir::new(16);

for i in 0..1000 {
reservoir.push(i as f64);
}

reservoir.consume(|drain| {
assert_eq!(drain.unsampled_len(), 1000);
assert_eq!(drain.unsampled_sum(), 499500.0);
assert!(drain.sample_rate() < 1.0);

let collected: Vec<f64> = drain.collect();
assert_eq!(collected.len(), 16);

let sampled_sum: f64 = collected.iter().sum();
assert!(sampled_sum < 499500.0);
});
}

#[test]
fn test_reset_after_drain() {
let reservoir = AtomicSamplingReservoir::new(64);

for i in 0..100 {
reservoir.push(i as f64);
}

reservoir.consume(|_drain| {
});

for i in 0..50 {
reservoir.push(i as f64);
}

reservoir.consume(|drain| {
assert_eq!(drain.unsampled_len(), 50);
assert_eq!(drain.unsampled_sum(), 1225.0);
});
}

#[test]
fn test_empty_reservoir() {
let reservoir = AtomicSamplingReservoir::new(64);

reservoir.consume(|drain| {
assert_eq!(drain.unsampled_len(), 0);
assert_eq!(drain.unsampled_sum(), 0.0);
});
}
}
Loading