Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 46 additions & 125 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ use opentelemetry::KeyValue;
use super::ValueMap;
use super::{Aggregator, Number};

struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T> Aggregator for HistogramTracker<T>
impl<T> Aggregator for Mutex<Buckets<T>>
where
T: Number,
{
Expand All @@ -22,27 +18,26 @@ where
type PreComputedValue = (T, usize);

fn update(&self, (value, index): (T, usize)) {
let mut buckets = match self.buckets.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());

buckets.bin(index, value);
buckets.sum(value);
buckets.total += value;
buckets.count += 1;
buckets.counts[index] += 1;
if value < buckets.min {
buckets.min = value;
}
if value > buckets.max {
buckets.max = value
}
}

fn create(count: &usize) -> Self {
HistogramTracker {
buckets: Mutex::new(Buckets::<T>::new(*count)),
}
Mutex::new(Buckets::<T>::new(*count))
}

fn clone_and_reset(&self, count: &usize) -> Self {
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
let cloned = replace(current.deref_mut(), Buckets::new(*count));
Self {
buckets: Mutex::new(cloned),
}
let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
}
}

Expand All @@ -65,62 +60,34 @@ impl<T: Number> Buckets<T> {
..Default::default()
}
}

fn sum(&mut self, value: T) {
self.total += value;
}

fn bin(&mut self, idx: usize, value: T) {
self.counts[idx] += 1;
self.count += 1;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value
}
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<HistogramTracker<T>>,
value_map: ValueMap<Mutex<Buckets<T>>>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
start: Mutex<SystemTime>,
}

impl<T: Number> Histogram<T> {
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
// TODO fix the bug, by first removing NaN and only then getting buckets_count
// once we know the reason for performance degradation
let buckets_count = boundaries.len() + 1;
let mut histogram = Histogram {
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
bounds.retain(|v| !v.is_nan());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are only 3 ways bounds can come from:

  1. Default. - In this case, sdk provides it so we ensure its already sorted and validated.
  2. HistogramCreation API - In this case, user provides it, but while creating the instrument, sdk should validate it, and ensure its sorted/validated. If the validation fails, sdk can return the defaults or noop instrument. Opened this issue to track it: Validate user provided Histogram bounds at sdk during Instrument creation time #2286
  3. Views - In this case also, user provides it, but we should do same as 2.

I believe this method is the wrong place to validate/filter bounds as it should be taken care elsewhere.

Not a blocker for this PR, just added a comment to make sure have same understanding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this method is the wrong place to validate/filter bounds as it should be taken care elsewhere.

💯

bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
let buckets_count = bounds.len() + 1;
Histogram {
value_map: ValueMap::new(buckets_count),
bounds: boundaries,
bounds,
record_min_max,
record_sum,
start: Mutex::new(SystemTime::now()),
};

histogram.bounds.retain(|v| !v.is_nan());
histogram
.bounds
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));

histogram
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();
// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
// TODO: uncomment once we know the reason for performance degradation
// if f.is_infinite() || f.is_nan() {
// return;
// }
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
Expand Down Expand Up @@ -156,17 +123,14 @@ impl<T: Number> Histogram<T> {

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr
.buckets
.into_inner()
.unwrap_or_else(|err| err.into_inner());
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
bucket_counts: b.counts,
sum: if self.record_sum {
b.total
} else {
Expand Down Expand Up @@ -214,7 +178,7 @@ impl<T: Number> Histogram<T> {

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
Expand Down Expand Up @@ -245,68 +209,25 @@ impl<T: Number> Histogram<T> {
}
}

// TODO: uncomment once we know the reason for performance degradation
// #[cfg(test)]
// mod tests {
#[cfg(test)]
mod tests {
use super::*;

// use super::*;

// #[test]
// fn when_f64_is_nan_or_infinity_then_ignore() {
// struct Expected {
// min: f64,
// max: f64,
// sum: f64,
// count: u64,
// }
// impl Expected {
// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self {
// Expected {
// min,
// max,
// sum,
// count,
// }
// }
// }
// struct TestCase {
// values: Vec<f64>,
// expected: Expected,
// }

// let test_cases = vec![
// TestCase {
// values: vec![2.0, 4.0, 1.0],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![2.0, 4.0, 1.0, f64::INFINITY],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![2.0, f64::NAN, 4.0, 1.0],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
// expected: Expected::new(1.0, 16.0, 31.0, 6),
// },
// ];

// for test in test_cases {
// let h = Histogram::new(vec![], true, true);
// for v in test.values {
// h.measure(v, &[]);
// }
// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap();
// assert_eq!(test.expected.max, res.max);
// assert_eq!(test.expected.min, res.min);
// assert_eq!(test.expected.sum, res.total);
// assert_eq!(test.expected.count, res.count);
// }
// }
// }
#[test]
fn check_buckets_are_selected_correctly() {
let hist = Histogram::<i64>::new(vec![1.0, 3.0, 6.0], false, false);
for v in 1..11 {
hist.measure(v, &[]);
}
let (count, dp) = hist.cumulative(None);
let dp = dp.unwrap();
let dp = dp.as_any().downcast_ref::<data::Histogram<i64>>().unwrap();
assert_eq!(count, 1);
assert_eq!(dp.data_points[0].count, 10);
assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
assert_eq!(dp.data_points[0].bucket_counts[0], 1); // 1
assert_eq!(dp.data_points[0].bucket_counts[1], 2); // 2, 3
assert_eq!(dp.data_points[0].bucket_counts[2], 3); // 4, 5, 6
assert_eq!(dp.data_points[0].bucket_counts[3], 4); // 7, 8, 9, 10
}
}
Loading