Skip to content

Commit c2d13a1

Browse files
committed
Directly implement ComputeAggregation
1 parent 15d69b1 commit c2d13a1

File tree

7 files changed

+147
-77
lines changed

7 files changed

+147
-77
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 23 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,7 @@ where
5656
/// Builds aggregate functions
5757
pub(crate) struct AggregateBuilder<T> {
5858
/// The temporality used for the returned aggregate functions.
59-
///
60-
/// If this is not provided, a default of cumulative will be used (except for the
61-
/// last-value aggregate function where delta is the only appropriate
62-
/// temporality).
63-
temporality: Option<Temporality>,
59+
temporality: Temporality,
6460

6561
/// The attribute filter the aggregate function will use on the input of
6662
/// measurements.
@@ -72,7 +68,7 @@ pub(crate) struct AggregateBuilder<T> {
7268
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
7369

7470
impl<T: Number> AggregateBuilder<T> {
75-
pub(crate) fn new(temporality: Option<Temporality>, filter: Option<Filter>) -> Self {
71+
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
7672
AggregateBuilder {
7773
temporality,
7874
filter,
@@ -96,16 +92,12 @@ impl<T: Number> AggregateBuilder<T> {
9692

9793
/// Builds a last-value aggregate function input and output.
9894
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
99-
let lv = Arc::new(LastValue::new());
95+
let lv = Arc::new(LastValue::new(self.temporality));
10096
let agg_lv = Arc::clone(&lv);
101-
let t = self.temporality;
10297

10398
(
10499
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
105-
move |dest: Option<&mut dyn Aggregation>| match t {
106-
Some(Temporality::Delta) => agg_lv.delta(dest),
107-
_ => agg_lv.cumulative(dest),
108-
},
100+
agg_lv,
109101
)
110102
}
111103

@@ -114,31 +106,23 @@ impl<T: Number> AggregateBuilder<T> {
114106
&self,
115107
monotonic: bool,
116108
) -> (impl Measure<T>, impl ComputeAggregation) {
117-
let s = Arc::new(PrecomputedSum::new(monotonic));
109+
let s = Arc::new(PrecomputedSum::new(self.temporality, monotonic));
118110
let agg_sum = Arc::clone(&s);
119-
let t = self.temporality;
120111

121112
(
122113
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
123-
move |dest: Option<&mut dyn Aggregation>| match t {
124-
Some(Temporality::Delta) => agg_sum.delta(dest),
125-
_ => agg_sum.cumulative(dest),
126-
},
114+
agg_sum,
127115
)
128116
}
129117

130118
/// Builds a sum aggregate function input and output.
131119
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
132-
let s = Arc::new(Sum::new(monotonic));
120+
let s = Arc::new(Sum::new(self.temporality, monotonic));
133121
let agg_sum = Arc::clone(&s);
134-
let t = self.temporality;
135122

136123
(
137124
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
138-
move |dest: Option<&mut dyn Aggregation>| match t {
139-
Some(Temporality::Delta) => agg_sum.delta(dest),
140-
_ => agg_sum.cumulative(dest),
141-
},
125+
agg_sum,
142126
)
143127
}
144128

@@ -149,17 +133,15 @@ impl<T: Number> AggregateBuilder<T> {
149133
record_min_max: bool,
150134
record_sum: bool,
151135
) -> (impl Measure<T>, impl ComputeAggregation) {
152-
let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
136+
let h = Arc::new(Histogram::new(
137+
self.temporality,
138+
boundaries,
139+
record_min_max,
140+
record_sum,
141+
));
153142
let agg_h = Arc::clone(&h);
154-
let t = self.temporality;
155143

156-
(
157-
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
158-
move |dest: Option<&mut dyn Aggregation>| match t {
159-
Some(Temporality::Delta) => agg_h.delta(dest),
160-
_ => agg_h.cumulative(dest),
161-
},
162-
)
144+
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
163145
}
164146

165147
/// Builds an exponential histogram aggregate function input and output.
@@ -171,21 +153,15 @@ impl<T: Number> AggregateBuilder<T> {
171153
record_sum: bool,
172154
) -> (impl Measure<T>, impl ComputeAggregation) {
173155
let h = Arc::new(ExpoHistogram::new(
156+
self.temporality,
174157
max_size,
175158
max_scale,
176159
record_min_max,
177160
record_sum,
178161
));
179162
let agg_h = Arc::clone(&h);
180-
let t = self.temporality;
181163

182-
(
183-
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
184-
move |dest: Option<&mut dyn Aggregation>| match t {
185-
Some(Temporality::Delta) => agg_h.delta(dest),
186-
_ => agg_h.cumulative(dest),
187-
},
188-
)
164+
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
189165
}
190166
}
191167

@@ -201,7 +177,8 @@ mod tests {
201177

202178
#[test]
203179
fn last_value_aggregation() {
204-
let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
180+
let (measure, agg) =
181+
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
205182
let mut a = Gauge {
206183
data_points: vec![GaugeDataPoint {
207184
attributes: vec![KeyValue::new("a", 1)],
@@ -227,7 +204,7 @@ mod tests {
227204
fn precomputed_sum_aggregation() {
228205
for temporality in [Temporality::Delta, Temporality::Cumulative] {
229206
let (measure, agg) =
230-
AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
207+
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
231208
let mut a = Sum {
232209
data_points: vec![
233210
SumDataPoint {
@@ -268,7 +245,7 @@ mod tests {
268245
#[test]
269246
fn sum_aggregation() {
270247
for temporality in [Temporality::Delta, Temporality::Cumulative] {
271-
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
248+
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
272249
let mut a = Sum {
273250
data_points: vec![
274251
SumDataPoint {
@@ -309,7 +286,7 @@ mod tests {
309286
#[test]
310287
fn explicit_bucket_histogram_aggregation() {
311288
for temporality in [Temporality::Delta, Temporality::Cumulative] {
312-
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
289+
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
313290
.explicit_bucket_histogram(vec![1.0], true, true);
314291
let mut a = Histogram {
315292
data_points: vec![HistogramDataPoint {
@@ -352,7 +329,7 @@ mod tests {
352329
#[test]
353330
fn exponential_histogram_aggregation() {
354331
for temporality in [Temporality::Delta, Temporality::Cumulative] {
355-
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
332+
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
356333
.exponential_bucket_histogram(4, 20, true, true);
357334
let mut a = ExponentialHistogram {
358335
data_points: vec![ExponentialHistogramDataPoint {

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
1+
use std::{
2+
f64::consts::LOG2_E,
3+
mem::replace,
4+
ops::DerefMut,
5+
sync::{Arc, Mutex},
6+
time::SystemTime,
7+
};
28

39
use opentelemetry::{otel_debug, KeyValue};
410
use std::sync::OnceLock;
@@ -8,7 +14,7 @@ use crate::metrics::{
814
Temporality,
915
};
1016

11-
use super::{Aggregator, Number, ValueMap};
17+
use super::{Aggregator, ComputeAggregation, Number, ValueMap};
1218

1319
pub(crate) const EXPO_MAX_SCALE: i8 = 20;
1420
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
@@ -350,6 +356,7 @@ struct BucketConfig {
350356
/// measurements were made in.
351357
pub(crate) struct ExpoHistogram<T: Number> {
352358
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
359+
temporality: Temporality,
353360
start: Mutex<SystemTime>,
354361
record_sum: bool,
355362
record_min_max: bool,
@@ -358,6 +365,7 @@ pub(crate) struct ExpoHistogram<T: Number> {
358365
impl<T: Number> ExpoHistogram<T> {
359366
/// Create a new exponential histogram.
360367
pub(crate) fn new(
368+
temporality: Temporality,
361369
max_size: u32,
362370
max_scale: i8,
363371
record_min_max: bool,
@@ -368,6 +376,7 @@ impl<T: Number> ExpoHistogram<T> {
368376
max_size: max_size as i32,
369377
max_scale,
370378
}),
379+
temporality,
371380
record_sum,
372381
record_min_max,
373382
start: Mutex::new(SystemTime::now()),
@@ -385,10 +394,7 @@ impl<T: Number> ExpoHistogram<T> {
385394
self.value_map.measure(value, attrs);
386395
}
387396

388-
pub(crate) fn delta(
389-
&self,
390-
dest: Option<&mut dyn Aggregation>,
391-
) -> (usize, Option<Box<dyn Aggregation>>) {
397+
fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
392398
let time = SystemTime::now();
393399
let start_time = self
394400
.start
@@ -447,7 +453,7 @@ impl<T: Number> ExpoHistogram<T> {
447453
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
448454
}
449455

450-
pub(crate) fn cumulative(
456+
fn cumulative(
451457
&self,
452458
dest: Option<&mut dyn Aggregation>,
453459
) -> (usize, Option<Box<dyn Aggregation>>) {
@@ -510,6 +516,18 @@ impl<T: Number> ExpoHistogram<T> {
510516
}
511517
}
512518

519+
impl<T> ComputeAggregation for Arc<ExpoHistogram<T>>
520+
where
521+
T: Number,
522+
{
523+
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
524+
match self.temporality {
525+
Temporality::Delta => self.delta(dest),
526+
_ => self.cumulative(dest),
527+
}
528+
}
529+
}
530+
513531
#[cfg(test)]
514532
mod tests {
515533
use std::ops::Neg;
@@ -675,7 +693,7 @@ mod tests {
675693
];
676694

677695
for test in test_cases {
678-
let h = ExpoHistogram::new(4, 20, true, true);
696+
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
679697
for v in test.values {
680698
h.measure(v, &[]);
681699
}
@@ -724,7 +742,7 @@ mod tests {
724742
];
725743

726744
for test in test_cases {
727-
let h = ExpoHistogram::new(4, 20, true, true);
745+
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
728746
for v in test.values {
729747
h.measure(v, &[]);
730748
}
@@ -1251,7 +1269,7 @@ mod tests {
12511269
name: "Delta Single",
12521270
build: Box::new(move || {
12531271
box_val(
1254-
AggregateBuilder::new(Some(Temporality::Delta), None)
1272+
AggregateBuilder::new(Temporality::Delta, None)
12551273
.exponential_bucket_histogram(
12561274
max_size,
12571275
max_scale,
@@ -1294,7 +1312,7 @@ mod tests {
12941312
name: "Cumulative Single",
12951313
build: Box::new(move || {
12961314
box_val(
1297-
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
1315+
internal::AggregateBuilder::new(Temporality::Cumulative, None)
12981316
.exponential_bucket_histogram(
12991317
max_size,
13001318
max_scale,
@@ -1337,7 +1355,7 @@ mod tests {
13371355
name: "Delta Multiple",
13381356
build: Box::new(move || {
13391357
box_val(
1340-
internal::AggregateBuilder::new(Some(Temporality::Delta), None)
1358+
internal::AggregateBuilder::new(Temporality::Delta, None)
13411359
.exponential_bucket_histogram(
13421360
max_size,
13431361
max_scale,
@@ -1383,7 +1401,7 @@ mod tests {
13831401
name: "Cumulative Multiple ",
13841402
build: Box::new(move || {
13851403
box_val(
1386-
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
1404+
internal::AggregateBuilder::new(Temporality::Cumulative, None)
13871405
.exponential_bucket_histogram(
13881406
max_size,
13891407
max_scale,

0 commit comments

Comments
 (0)