Skip to content

Commit 00bee01

Browse files
committed
Directly implement Measure trait
1 parent e0159ad commit 00bee01

File tree

8 files changed

+107
-96
lines changed

8 files changed

+107
-96
lines changed

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ pub(crate) struct ResolvedMeasures<T> {
285285
impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
286286
fn measure(&self, val: T, attrs: &[KeyValue]) {
287287
for measure in &self.measures {
288-
measure.call(val, attrs)
288+
measure.measure(val, attrs)
289289
}
290290
}
291291
}
@@ -304,7 +304,7 @@ impl<T> Observable<T> {
304304
impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
305305
fn observe(&self, measurement: T, attrs: &[KeyValue]) {
306306
for measure in &self.measures {
307-
measure.call(measurement, attrs)
307+
measure.measure(measurement, attrs)
308308
}
309309
}
310310
}

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,26 @@ pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
2121

2222
/// Receives measurements to be aggregated.
2323
pub(crate) trait Measure<T>: Send + Sync + 'static {
24-
fn call(&self, measurement: T, attrs: &[KeyValue]);
24+
fn measure(&self, measurement: T, attrs: &[KeyValue]);
2525
}
2626

27-
impl<F, T> Measure<T> for F
27+
struct FilteredMeasureInstrument<I> {
28+
instrument: Arc<I>,
29+
filter: Filter,
30+
}
31+
32+
impl<I, T> Measure<T> for FilteredMeasureInstrument<I>
2833
where
29-
F: Fn(T, &[KeyValue]) + Send + Sync + 'static,
34+
T: 'static,
35+
I: Measure<T>,
3036
{
31-
fn call(&self, measurement: T, attrs: &[KeyValue]) {
32-
self(measurement, attrs)
37+
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
38+
let filtered_attrs: Vec<KeyValue> = attrs
39+
.iter()
40+
.filter(|kv| (self.filter)(kv))
41+
.cloned()
42+
.collect();
43+
self.instrument.measure(measurement, &filtered_attrs);
3344
}
3445
}
3546

@@ -83,28 +94,24 @@ impl<T: Number> AggregateBuilder<T> {
8394
}
8495
}
8596

86-
/// Wraps the passed in measure with an attribute filtering function.
87-
fn filter(&self, f: impl Measure<T>) -> impl Measure<T> {
88-
let filter = self.filter.clone();
89-
move |n, attrs: &[KeyValue]| {
90-
if let Some(filter) = &filter {
91-
let filtered_attrs: Vec<KeyValue> =
92-
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
93-
f.call(n, &filtered_attrs);
94-
} else {
95-
f.call(n, attrs);
96-
};
97+
fn maybe_filtered_measurement(&self, instrument: Arc<impl Measure<T>>) -> Arc<dyn Measure<T>> {
98+
if let Some(filter) = &self.filter {
99+
Arc::new(FilteredMeasureInstrument {
100+
filter: filter.clone(),
101+
instrument,
102+
})
103+
} else {
104+
instrument
97105
}
98106
}
99107

100108
/// Builds a last-value aggregate function input and output.
101-
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
102-
let lv_filter = Arc::new(LastValue::new());
103-
let lv_agg = Arc::clone(&lv_filter);
109+
pub(crate) fn last_value(&self) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
110+
let lv_agg = Arc::new(LastValue::new());
104111
let t = self.temporality;
105112

106113
(
107-
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)),
114+
self.maybe_filtered_measurement(lv_agg.clone()),
108115
move |dest: Option<&mut dyn Aggregation>| {
109116
let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
110117
let mut new_agg = if g.is_none() {
@@ -132,13 +139,12 @@ impl<T: Number> AggregateBuilder<T> {
132139
pub(crate) fn precomputed_sum(
133140
&self,
134141
monotonic: bool,
135-
) -> (impl Measure<T>, impl ComputeAggregation) {
136-
let s = Arc::new(PrecomputedSum::new(monotonic));
137-
let agg_sum = Arc::clone(&s);
142+
) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
143+
let agg_sum = Arc::new(PrecomputedSum::new(monotonic));
138144
let t = self.temporality;
139145

140146
(
141-
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
147+
self.maybe_filtered_measurement(agg_sum.clone()),
142148
move |dest: Option<&mut dyn Aggregation>| match t {
143149
Some(Temporality::Delta) => agg_sum.delta(dest),
144150
_ => agg_sum.cumulative(dest),
@@ -147,13 +153,12 @@ impl<T: Number> AggregateBuilder<T> {
147153
}
148154

149155
/// Builds a sum aggregate function input and output.
150-
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
151-
let s = Arc::new(Sum::new(monotonic));
152-
let agg_sum = Arc::clone(&s);
156+
pub(crate) fn sum(&self, monotonic: bool) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
157+
let agg_sum = Arc::new(Sum::new(monotonic));
153158
let t = self.temporality;
154159

155160
(
156-
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
161+
self.maybe_filtered_measurement(agg_sum.clone()),
157162
move |dest: Option<&mut dyn Aggregation>| match t {
158163
Some(Temporality::Delta) => agg_sum.delta(dest),
159164
_ => agg_sum.cumulative(dest),
@@ -167,13 +172,12 @@ impl<T: Number> AggregateBuilder<T> {
167172
boundaries: Vec<f64>,
168173
record_min_max: bool,
169174
record_sum: bool,
170-
) -> (impl Measure<T>, impl ComputeAggregation) {
171-
let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
172-
let agg_h = Arc::clone(&h);
175+
) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
176+
let agg_h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
173177
let t = self.temporality;
174178

175179
(
176-
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
180+
self.maybe_filtered_measurement(agg_h.clone()),
177181
move |dest: Option<&mut dyn Aggregation>| match t {
178182
Some(Temporality::Delta) => agg_h.delta(dest),
179183
_ => agg_h.cumulative(dest),
@@ -188,18 +192,17 @@ impl<T: Number> AggregateBuilder<T> {
188192
max_scale: i8,
189193
record_min_max: bool,
190194
record_sum: bool,
191-
) -> (impl Measure<T>, impl ComputeAggregation) {
192-
let h = Arc::new(ExpoHistogram::new(
195+
) -> (Arc<dyn Measure<T>>, impl ComputeAggregation) {
196+
let agg_h = Arc::new(ExpoHistogram::new(
193197
max_size,
194198
max_scale,
195199
record_min_max,
196200
record_sum,
197201
));
198-
let agg_h = Arc::clone(&h);
199202
let t = self.temporality;
200203

201204
(
202-
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
205+
self.maybe_filtered_measurement(agg_h.clone()),
203206
move |dest: Option<&mut dyn Aggregation>| match t {
204207
Some(Temporality::Delta) => agg_h.delta(dest),
205208
_ => agg_h.cumulative(dest),
@@ -231,7 +234,7 @@ mod tests {
231234
}],
232235
};
233236
let new_attributes = [KeyValue::new("b", 2)];
234-
measure.call(2, &new_attributes[..]);
237+
measure.measure(2, &new_attributes[..]);
235238

236239
let (count, new_agg) = agg.call(Some(&mut a));
237240

@@ -272,7 +275,7 @@ mod tests {
272275
is_monotonic: false,
273276
};
274277
let new_attributes = [KeyValue::new("b", 2)];
275-
measure.call(3, &new_attributes[..]);
278+
measure.measure(3, &new_attributes[..]);
276279

277280
let (count, new_agg) = agg.call(Some(&mut a));
278281

@@ -315,7 +318,7 @@ mod tests {
315318
is_monotonic: false,
316319
};
317320
let new_attributes = [KeyValue::new("b", 2)];
318-
measure.call(3, &new_attributes[..]);
321+
measure.measure(3, &new_attributes[..]);
319322

320323
let (count, new_agg) = agg.call(Some(&mut a));
321324

@@ -354,7 +357,7 @@ mod tests {
354357
},
355358
};
356359
let new_attributes = [KeyValue::new("b", 2)];
357-
measure.call(3, &new_attributes[..]);
360+
measure.measure(3, &new_attributes[..]);
358361

359362
let (count, new_agg) = agg.call(Some(&mut a));
360363

@@ -406,7 +409,7 @@ mod tests {
406409
},
407410
};
408411
let new_attributes = [KeyValue::new("b", 2)];
409-
measure.call(3, &new_attributes[..]);
412+
measure.measure(3, &new_attributes[..]);
410413

411414
let (count, new_agg) = agg.call(Some(&mut a));
412415

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::metrics::{
88
Temporality,
99
};
1010

11-
use super::{Aggregator, Number, ValueMap};
11+
use super::{Aggregator, Measure, Number, ValueMap};
1212

1313
pub(crate) const EXPO_MAX_SCALE: i8 = 20;
1414
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
@@ -355,6 +355,19 @@ pub(crate) struct ExpoHistogram<T: Number> {
355355
record_min_max: bool,
356356
}
357357

358+
impl<T: Number> Measure<T> for ExpoHistogram<T> {
359+
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
360+
let f_value = measurement.into_float();
361+
// Ignore NaN and infinity.
362+
// Only makes sense if T is f64, maybe this could be no-op for other cases?
363+
if !f_value.is_finite() {
364+
return;
365+
}
366+
367+
self.value_map.measure(measurement, attrs);
368+
}
369+
}
370+
358371
impl<T: Number> ExpoHistogram<T> {
359372
/// Create a new exponential histogram.
360373
pub(crate) fn new(
@@ -374,17 +387,6 @@ impl<T: Number> ExpoHistogram<T> {
374387
}
375388
}
376389

377-
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
378-
let f_value = value.into_float();
379-
// Ignore NaN and infinity.
380-
// Only makes sense if T is f64, maybe this could be no-op for other cases?
381-
if !f_value.is_finite() {
382-
return;
383-
}
384-
385-
self.value_map.measure(value, attrs);
386-
}
387-
388390
pub(crate) fn delta(
389391
&self,
390392
dest: Option<&mut dyn Aggregation>,
@@ -510,7 +512,7 @@ impl<T: Number> ExpoHistogram<T> {
510512

511513
#[cfg(test)]
512514
mod tests {
513-
use std::ops::Neg;
515+
use std::{ops::Neg, sync::Arc};
514516

515517
use crate::metrics::internal::{self, AggregateBuilder};
516518

@@ -1217,12 +1219,9 @@ mod tests {
12171219
}
12181220

12191221
fn box_val<T>(
1220-
(m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
1221-
) -> (
1222-
Box<dyn internal::Measure<T>>,
1223-
Box<dyn internal::ComputeAggregation>,
1224-
) {
1225-
(Box::new(m), Box::new(ca))
1222+
(m, ca): (Arc<dyn Measure<T>>, impl internal::ComputeAggregation),
1223+
) -> (Arc<dyn Measure<T>>, Box<dyn internal::ComputeAggregation>) {
1224+
(m, Box::new(ca))
12261225
}
12271226

12281227
fn hist_aggregation<T: Number + From<u32>>() {
@@ -1236,7 +1235,7 @@ mod tests {
12361235
name: &'static str,
12371236
build: Box<
12381237
dyn Fn() -> (
1239-
Box<dyn internal::Measure<T>>,
1238+
Arc<dyn internal::Measure<T>>,
12401239
Box<dyn internal::ComputeAggregation>,
12411240
),
12421241
>,
@@ -1435,7 +1434,7 @@ mod tests {
14351434
let mut count = 0;
14361435
for n in test.input {
14371436
for v in n {
1438-
in_fn.call(v, &[])
1437+
in_fn.measure(v, &[])
14391438
}
14401439
count = out_fn.call(Some(got.as_mut())).0
14411440
}

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use crate::metrics::data::{self, Aggregation};
77
use crate::metrics::Temporality;
88
use opentelemetry::KeyValue;
99

10-
use super::ValueMap;
1110
use super::{Aggregator, Number};
11+
use super::{Measure, ValueMap};
1212

1313
impl<T> Aggregator for Mutex<Buckets<T>>
1414
where
@@ -73,6 +73,20 @@ pub(crate) struct Histogram<T: Number> {
7373
start: Mutex<SystemTime>,
7474
}
7575

76+
impl<T: Number> Measure<T> for Histogram<T> {
77+
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
78+
let f = measurement.into_float();
79+
// This search will return an index in the range `[0, bounds.len()]`, where
80+
// it will return `bounds.len()` if value is greater than the last element
81+
// of `bounds`. This aligns with the buckets in that the length of buckets
82+
// is `bounds.len()+1`, with the last bucket representing:
83+
// `(bounds[bounds.len()-1], +∞)`.
84+
let index = self.bounds.partition_point(|&x| x < f);
85+
86+
self.value_map.measure((measurement, index), attrs);
87+
}
88+
}
89+
7690
impl<T: Number> Histogram<T> {
7791
#[allow(unused_mut)]
7892
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
@@ -93,18 +107,6 @@ impl<T: Number> Histogram<T> {
93107
}
94108
}
95109

96-
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
97-
let f = measurement.into_float();
98-
// This search will return an index in the range `[0, bounds.len()]`, where
99-
// it will return `bounds.len()` if value is greater than the last element
100-
// of `bounds`. This aligns with the buckets in that the length of buckets
101-
// is `bounds.len()+1`, with the last bucket representing:
102-
// `(bounds[bounds.len()-1], +∞)`.
103-
let index = self.bounds.partition_point(|&x| x < f);
104-
105-
self.value_map.measure((measurement, index), attrs);
106-
}
107-
108110
pub(crate) fn delta(
109111
&self,
110112
dest: Option<&mut dyn Aggregation>,

opentelemetry-sdk/src/metrics/internal/last_value.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
33
use crate::metrics::data::DataPoint;
44
use opentelemetry::KeyValue;
55

6-
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
6+
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Measure, Number, ValueMap};
77

88
/// this is reused by PrecomputedSum
99
pub(crate) struct Assign<T>
@@ -43,6 +43,12 @@ pub(crate) struct LastValue<T: Number> {
4343
start: Mutex<SystemTime>,
4444
}
4545

46+
impl<T: Number> Measure<T> for LastValue<T> {
47+
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
48+
self.value_map.measure(measurement, attrs);
49+
}
50+
}
51+
4652
impl<T: Number> LastValue<T> {
4753
pub(crate) fn new() -> Self {
4854
LastValue {
@@ -51,11 +57,6 @@ impl<T: Number> LastValue<T> {
5157
}
5258
}
5359

54-
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
55-
// The argument index is not applicable to LastValue.
56-
self.value_map.measure(measurement, attrs);
57-
}
58-
5960
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
6061
let t = SystemTime::now();
6162
let prev_start = self

0 commit comments

Comments
 (0)