Skip to content

Commit 9f87703

Browse files
frailltMindaugas Vinkelis
authored andcommitted
ValueMap interface change
1 parent 3976f3d commit 9f87703

File tree

7 files changed

+235
-227
lines changed

7 files changed

+235
-227
lines changed

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,12 +341,6 @@ impl<T: Number> ExpoHistogram<T> {
341341
}
342342

343343
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
344-
let f_value = value.into_float();
345-
// Ignore NaN and infinity.
346-
if f_value.is_infinite() || f_value.is_nan() {
347-
return;
348-
}
349-
350344
let attrs: AttributeSet = attrs.into();
351345
if let Ok(mut values) = self.values.lock() {
352346
let v = values.entry(attrs).or_insert_with(|| {

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 89 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -7,79 +7,41 @@ use crate::metrics::data::HistogramDataPoint;
77
use crate::metrics::data::{self, Aggregation, Temporality};
88
use opentelemetry::KeyValue;
99

10-
use super::Number;
11-
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
10+
use super::ValueMap;
11+
use super::{Aggregator, Number};
1212

13-
struct HistogramUpdate;
14-
15-
impl Operation for HistogramUpdate {
16-
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
17-
tracker.update_histogram(index, value);
18-
}
19-
}
20-
21-
struct HistogramTracker<T> {
22-
buckets: Mutex<Buckets<T>>,
23-
}
24-
25-
impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
26-
fn update_histogram(&self, index: usize, value: T) {
27-
let mut buckets = match self.buckets.lock() {
28-
Ok(guard) => guard,
29-
Err(_) => return,
30-
};
31-
32-
buckets.bin(index, value);
33-
buckets.sum(value);
34-
}
35-
}
36-
37-
impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
38-
type AtomicTracker = HistogramTracker<T>;
39-
40-
fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
41-
let count = buckets_count.unwrap();
42-
HistogramTracker {
43-
buckets: Mutex::new(Buckets::<T>::new(count)),
44-
}
45-
}
13+
struct BucketsConfig {
14+
bounds: Vec<f64>,
15+
record_min_max: bool,
16+
record_sum: bool,
4617
}
4718

48-
#[derive(Default)]
49-
struct Buckets<T> {
19+
#[derive(Default, Debug, Clone)]
20+
struct BucketsData<T> {
5021
counts: Vec<u64>,
5122
count: u64,
5223
total: T,
5324
min: T,
5425
max: T,
5526
}
5627

57-
impl<T: Number> Buckets<T> {
58-
/// returns buckets with `n` bins.
59-
fn new(n: usize) -> Buckets<T> {
60-
Buckets {
61-
counts: vec![0; n],
28+
struct Buckets<T> {
29+
data: Mutex<BucketsData<T>>,
30+
}
31+
32+
impl<T> BucketsData<T>
33+
where
34+
T: Number,
35+
{
36+
fn new(size: usize) -> Self {
37+
Self {
38+
counts: vec![0; size],
6239
min: T::max(),
6340
max: T::min(),
6441
..Default::default()
6542
}
6643
}
6744

68-
fn sum(&mut self, value: T) {
69-
self.total += value;
70-
}
71-
72-
fn bin(&mut self, idx: usize, value: T) {
73-
self.counts[idx] += 1;
74-
self.count += 1;
75-
if value < self.min {
76-
self.min = value;
77-
}
78-
if value > self.max {
79-
self.max = value
80-
}
81-
}
82-
8345
fn reset(&mut self) {
8446
for item in &mut self.counts {
8547
*item = 0;
@@ -91,45 +53,67 @@ impl<T: Number> Buckets<T> {
9153
}
9254
}
9355

56+
impl<T> Aggregator<T> for Buckets<T>
57+
where
58+
T: Number,
59+
{
60+
type Config = BucketsConfig;
61+
62+
fn create(config: &BucketsConfig) -> Self {
63+
let size = config.bounds.len() + 1;
64+
Buckets {
65+
data: Mutex::new(BucketsData::new(size)),
66+
}
67+
}
68+
69+
fn update(&self, config: &BucketsConfig, measurement: T) {
70+
let f_value = measurement.into_float();
71+
// This search will return an index in the range `[0, bounds.len()]`, where
72+
// it will return `bounds.len()` if value is greater than the last element
73+
// of `bounds`. This aligns with the buckets in that the length of buckets
74+
// is `bounds.len()+1`, with the last bucket representing:
75+
// `(bounds[bounds.len()-1], +∞)`.
76+
let idx = config.bounds.partition_point(|&x| x < f_value);
77+
if let Ok(mut data) = self.data.lock() {
78+
data.counts[idx] += 1;
79+
data.count += 1;
80+
if config.record_min_max {
81+
if measurement < data.min {
82+
data.min = measurement;
83+
}
84+
if measurement > data.max {
85+
data.max = measurement
86+
}
87+
}
88+
// it's very cheap to update it, even if it is not configured to record_sum
89+
data.total += measurement;
90+
}
91+
}
92+
}
93+
9494
/// Summarizes a set of measurements as a histogram with explicitly defined
9595
/// buckets.
9696
pub(crate) struct Histogram<T: Number> {
97-
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
98-
bounds: Vec<f64>,
99-
record_min_max: bool,
100-
record_sum: bool,
97+
value_map: ValueMap<T, Buckets<T>>,
10198
start: Mutex<SystemTime>,
10299
}
103100

104101
impl<T: Number> Histogram<T> {
105-
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
106-
let buckets_count = boundaries.len() + 1;
107-
let mut histogram = Histogram {
108-
value_map: ValueMap::new_with_buckets_count(buckets_count),
109-
bounds: boundaries,
110-
record_min_max,
111-
record_sum,
102+
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
103+
bounds.retain(|v| !v.is_nan());
104+
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
105+
Self {
106+
value_map: ValueMap::new(BucketsConfig {
107+
record_min_max,
108+
record_sum,
109+
bounds,
110+
}),
112111
start: Mutex::new(SystemTime::now()),
113-
};
114-
115-
histogram.bounds.retain(|v| !v.is_nan());
116-
histogram
117-
.bounds
118-
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
119-
120-
histogram
112+
}
121113
}
122114

123115
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
124-
let f = measurement.into_float();
125-
126-
// This search will return an index in the range `[0, bounds.len()]`, where
127-
// it will return `bounds.len()` if value is greater than the last element
128-
// of `bounds`. This aligns with the buckets in that the length of buckets
129-
// is `bounds.len()+1`, with the last bucket representing:
130-
// `(bounds[bounds.len()-1], +∞)`.
131-
let index = self.bounds.partition_point(|&x| x < f);
132-
self.value_map.measure(measurement, attrs, index);
116+
self.value_map.measure(measurement, attrs);
133117
}
134118

135119
pub(crate) fn delta(
@@ -167,25 +151,25 @@ impl<T: Number> Histogram<T> {
167151
.has_no_attribute_value
168152
.swap(false, Ordering::AcqRel)
169153
{
170-
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
154+
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.data.lock() {
171155
h.data_points.push(HistogramDataPoint {
172156
attributes: vec![],
173157
start_time: start,
174158
time: t,
175159
count: b.count,
176-
bounds: self.bounds.clone(),
160+
bounds: self.value_map.config.bounds.clone(),
177161
bucket_counts: b.counts.clone(),
178-
sum: if self.record_sum {
162+
sum: if self.value_map.config.record_sum {
179163
b.total
180164
} else {
181165
T::default()
182166
},
183-
min: if self.record_min_max {
167+
min: if self.value_map.config.record_min_max {
184168
Some(b.min)
185169
} else {
186170
None
187171
},
188-
max: if self.record_min_max {
172+
max: if self.value_map.config.record_min_max {
189173
Some(b.max)
190174
} else {
191175
None
@@ -205,25 +189,25 @@ impl<T: Number> Histogram<T> {
205189
let mut seen = HashSet::new();
206190
for (attrs, tracker) in trackers.drain() {
207191
if seen.insert(Arc::as_ptr(&tracker)) {
208-
if let Ok(b) = tracker.buckets.lock() {
192+
if let Ok(b) = tracker.data.lock() {
209193
h.data_points.push(HistogramDataPoint {
210194
attributes: attrs.clone(),
211195
start_time: start,
212196
time: t,
213197
count: b.count,
214-
bounds: self.bounds.clone(),
198+
bounds: self.value_map.config.bounds.clone(),
215199
bucket_counts: b.counts.clone(),
216-
sum: if self.record_sum {
200+
sum: if self.value_map.config.record_sum {
217201
b.total
218202
} else {
219203
T::default()
220204
},
221-
min: if self.record_min_max {
205+
min: if self.value_map.config.record_min_max {
222206
Some(b.min)
223207
} else {
224208
None
225209
},
226-
max: if self.record_min_max {
210+
max: if self.value_map.config.record_min_max {
227211
Some(b.max)
228212
} else {
229213
None
@@ -278,25 +262,25 @@ impl<T: Number> Histogram<T> {
278262
.has_no_attribute_value
279263
.load(Ordering::Acquire)
280264
{
281-
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
265+
if let Ok(b) = &self.value_map.no_attribute_tracker.data.lock() {
282266
h.data_points.push(HistogramDataPoint {
283267
attributes: vec![],
284268
start_time: start,
285269
time: t,
286270
count: b.count,
287-
bounds: self.bounds.clone(),
271+
bounds: self.value_map.config.bounds.clone(),
288272
bucket_counts: b.counts.clone(),
289-
sum: if self.record_sum {
273+
sum: if self.value_map.config.record_sum {
290274
b.total
291275
} else {
292276
T::default()
293277
},
294-
min: if self.record_min_max {
278+
min: if self.value_map.config.record_min_max {
295279
Some(b.min)
296280
} else {
297281
None
298282
},
299-
max: if self.record_min_max {
283+
max: if self.value_map.config.record_min_max {
300284
Some(b.max)
301285
} else {
302286
None
@@ -318,25 +302,25 @@ impl<T: Number> Histogram<T> {
318302
let mut seen = HashSet::new();
319303
for (attrs, tracker) in trackers.iter() {
320304
if seen.insert(Arc::as_ptr(tracker)) {
321-
if let Ok(b) = tracker.buckets.lock() {
305+
if let Ok(b) = tracker.data.lock() {
322306
h.data_points.push(HistogramDataPoint {
323307
attributes: attrs.clone(),
324308
start_time: start,
325309
time: t,
326310
count: b.count,
327-
bounds: self.bounds.clone(),
311+
bounds: self.value_map.config.bounds.clone(),
328312
bucket_counts: b.counts.clone(),
329-
sum: if self.record_sum {
313+
sum: if self.value_map.config.record_sum {
330314
b.total
331315
} else {
332316
T::default()
333317
},
334-
min: if self.record_min_max {
318+
min: if self.value_map.config.record_min_max {
335319
Some(b.min)
336320
} else {
337321
None
338322
},
339-
max: if self.record_min_max {
323+
max: if self.value_map.config.record_min_max {
340324
Some(b.max)
341325
} else {
342326
None

0 commit comments

Comments
 (0)