Skip to content

Commit d2e179e

Browse files
frailltcijothomas
andauthored
Aggregate time initiator (#2423)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent 9aeae0f commit d2e179e

File tree

6 files changed

+117
-112
lines changed

6 files changed

+117
-112
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
use std::{marker, sync::Arc};
1+
use std::{
2+
marker,
3+
mem::replace,
4+
ops::DerefMut,
5+
sync::{Arc, Mutex},
6+
time::SystemTime,
7+
};
28

39
use opentelemetry::KeyValue;
410

@@ -53,6 +59,44 @@ where
5359
}
5460
}
5561

62+
pub(crate) struct AggregateTime {
63+
pub start: SystemTime,
64+
pub current: SystemTime,
65+
}
66+
67+
/// Initialized [`AggregateTime`] for specific [`Temporality`]
68+
pub(crate) struct AggregateTimeInitiator(Mutex<SystemTime>);
69+
70+
impl AggregateTimeInitiator {
71+
pub(crate) fn delta(&self) -> AggregateTime {
72+
let current_time = SystemTime::now();
73+
let start_time = self
74+
.0
75+
.lock()
76+
.map(|mut start| replace(start.deref_mut(), current_time))
77+
.unwrap_or(current_time);
78+
AggregateTime {
79+
start: start_time,
80+
current: current_time,
81+
}
82+
}
83+
84+
pub(crate) fn cumulative(&self) -> AggregateTime {
85+
let current_time = SystemTime::now();
86+
let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time);
87+
AggregateTime {
88+
start: start_time,
89+
current: current_time,
90+
}
91+
}
92+
}
93+
94+
impl Default for AggregateTimeInitiator {
95+
fn default() -> Self {
96+
Self(Mutex::new(SystemTime::now()))
97+
}
98+
}
99+
56100
/// Builds aggregate functions
57101
pub(crate) struct AggregateBuilder<T> {
58102
/// The temporality used for the returned aggregate functions.

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
1+
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
22

33
use opentelemetry::{otel_debug, KeyValue};
44
use std::sync::OnceLock;
@@ -8,7 +8,7 @@ use crate::metrics::{
88
Temporality,
99
};
1010

11-
use super::{Aggregator, Number, ValueMap};
11+
use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap};
1212

1313
pub(crate) const EXPO_MAX_SCALE: i8 = 20;
1414
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
@@ -350,7 +350,7 @@ struct BucketConfig {
350350
/// measurements were made in.
351351
pub(crate) struct ExpoHistogram<T: Number> {
352352
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
353-
start: Mutex<SystemTime>,
353+
init_time: AggregateTimeInitiator,
354354
record_sum: bool,
355355
record_min_max: bool,
356356
}
@@ -370,7 +370,7 @@ impl<T: Number> ExpoHistogram<T> {
370370
}),
371371
record_sum,
372372
record_min_max,
373-
start: Mutex::new(SystemTime::now()),
373+
init_time: AggregateTimeInitiator::default(),
374374
}
375375
}
376376

@@ -389,28 +389,23 @@ impl<T: Number> ExpoHistogram<T> {
389389
&self,
390390
dest: Option<&mut dyn Aggregation>,
391391
) -> (usize, Option<Box<dyn Aggregation>>) {
392-
let time = SystemTime::now();
393-
let start_time = self
394-
.start
395-
.lock()
396-
.map(|mut start| replace(start.deref_mut(), time))
397-
.unwrap_or(time);
392+
let time = self.init_time.delta();
398393

399394
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
400395
let mut new_agg = if h.is_none() {
401396
Some(data::ExponentialHistogram {
402397
data_points: vec![],
403-
start_time,
404-
time,
398+
start_time: time.start,
399+
time: time.current,
405400
temporality: Temporality::Delta,
406401
})
407402
} else {
408403
None
409404
};
410405
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
411406
h.temporality = Temporality::Delta;
412-
h.start_time = start_time;
413-
h.time = time;
407+
h.start_time = time.start;
408+
h.time = time.current;
414409

415410
self.value_map
416411
.collect_and_reset(&mut h.data_points, |attributes, attr| {
@@ -451,28 +446,23 @@ impl<T: Number> ExpoHistogram<T> {
451446
&self,
452447
dest: Option<&mut dyn Aggregation>,
453448
) -> (usize, Option<Box<dyn Aggregation>>) {
454-
let time = SystemTime::now();
455-
let start_time = self
456-
.start
457-
.lock()
458-
.map(|s| *s)
459-
.unwrap_or_else(|_| SystemTime::now());
449+
let time = self.init_time.cumulative();
460450

461451
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
462452
let mut new_agg = if h.is_none() {
463453
Some(data::ExponentialHistogram {
464454
data_points: vec![],
465-
start_time,
466-
time,
455+
start_time: time.start,
456+
time: time.current,
467457
temporality: Temporality::Cumulative,
468458
})
469459
} else {
470460
None
471461
};
472462
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
473463
h.temporality = Temporality::Cumulative;
474-
h.start_time = start_time;
475-
h.time = time;
464+
h.start_time = time.start;
465+
h.time = time.current;
476466

477467
self.value_map
478468
.collect_readonly(&mut h.data_points, |attributes, attr| {
@@ -512,7 +502,7 @@ impl<T: Number> ExpoHistogram<T> {
512502

513503
#[cfg(test)]
514504
mod tests {
515-
use std::ops::Neg;
505+
use std::{ops::Neg, time::SystemTime};
516506

517507
use crate::metrics::internal::{self, AggregateBuilder};
518508

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::mem::replace;
22
use std::ops::DerefMut;
3-
use std::{sync::Mutex, time::SystemTime};
3+
use std::sync::Mutex;
44

55
use crate::metrics::data::HistogramDataPoint;
66
use crate::metrics::data::{self, Aggregation};
77
use crate::metrics::Temporality;
88
use opentelemetry::KeyValue;
99

10+
use super::aggregate::AggregateTimeInitiator;
1011
use super::ValueMap;
1112
use super::{Aggregator, Number};
1213

@@ -70,7 +71,7 @@ pub(crate) struct Histogram<T: Number> {
7071
bounds: Vec<f64>,
7172
record_min_max: bool,
7273
record_sum: bool,
73-
start: Mutex<SystemTime>,
74+
init_time: AggregateTimeInitiator,
7475
}
7576

7677
impl<T: Number> Histogram<T> {
@@ -89,7 +90,7 @@ impl<T: Number> Histogram<T> {
8990
bounds,
9091
record_min_max,
9192
record_sum,
92-
start: Mutex::new(SystemTime::now()),
93+
init_time: AggregateTimeInitiator::default(),
9394
}
9495
}
9596

@@ -109,27 +110,22 @@ impl<T: Number> Histogram<T> {
109110
&self,
110111
dest: Option<&mut dyn Aggregation>,
111112
) -> (usize, Option<Box<dyn Aggregation>>) {
112-
let time = SystemTime::now();
113-
let start_time = self
114-
.start
115-
.lock()
116-
.map(|mut start| replace(start.deref_mut(), time))
117-
.unwrap_or(time);
113+
let time = self.init_time.delta();
118114
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
119115
let mut new_agg = if h.is_none() {
120116
Some(data::Histogram {
121117
data_points: vec![],
122-
start_time,
123-
time,
118+
start_time: time.start,
119+
time: time.current,
124120
temporality: Temporality::Delta,
125121
})
126122
} else {
127123
None
128124
};
129125
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
130126
h.temporality = Temporality::Delta;
131-
h.start_time = start_time;
132-
h.time = time;
127+
h.start_time = time.start;
128+
h.time = time.current;
133129

134130
self.value_map
135131
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
@@ -165,28 +161,22 @@ impl<T: Number> Histogram<T> {
165161
&self,
166162
dest: Option<&mut dyn Aggregation>,
167163
) -> (usize, Option<Box<dyn Aggregation>>) {
168-
let time = SystemTime::now();
169-
let start_time = self
170-
.start
171-
.lock()
172-
.map(|s| *s)
173-
.unwrap_or_else(|_| SystemTime::now());
174-
164+
let time = self.init_time.cumulative();
175165
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
176166
let mut new_agg = if h.is_none() {
177167
Some(data::Histogram {
178168
data_points: vec![],
179-
start_time,
180-
time,
169+
start_time: time.start,
170+
time: time.current,
181171
temporality: Temporality::Cumulative,
182172
})
183173
} else {
184174
None
185175
};
186176
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
187177
h.temporality = Temporality::Cumulative;
188-
h.start_time = start_time;
189-
h.time = time;
178+
h.start_time = time.start;
179+
h.time = time.current;
190180

191181
self.value_map
192182
.collect_readonly(&mut h.data_points, |attributes, aggr| {

opentelemetry-sdk/src/metrics/internal/last_value.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
2-
31
use crate::metrics::data::{self, Aggregation, GaugeDataPoint};
42
use opentelemetry::KeyValue;
53

6-
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
4+
use super::{
5+
aggregate::AggregateTimeInitiator, Aggregator, AtomicTracker, AtomicallyUpdate, Number,
6+
ValueMap,
7+
};
78

89
/// this is reused by PrecomputedSum
910
pub(crate) struct Assign<T>
@@ -40,14 +41,14 @@ where
4041
/// Summarizes a set of measurements as the last one made.
4142
pub(crate) struct LastValue<T: Number> {
4243
value_map: ValueMap<Assign<T>>,
43-
start: Mutex<SystemTime>,
44+
init_time: AggregateTimeInitiator,
4445
}
4546

4647
impl<T: Number> LastValue<T> {
4748
pub(crate) fn new() -> Self {
4849
LastValue {
4950
value_map: ValueMap::new(()),
50-
start: Mutex::new(SystemTime::now()),
51+
init_time: AggregateTimeInitiator::default(),
5152
}
5253
}
5354

@@ -60,26 +61,21 @@ impl<T: Number> LastValue<T> {
6061
&self,
6162
dest: Option<&mut dyn Aggregation>,
6263
) -> (usize, Option<Box<dyn Aggregation>>) {
63-
let time = SystemTime::now();
64-
let start_time = self
65-
.start
66-
.lock()
67-
.map(|mut start| replace(start.deref_mut(), time))
68-
.unwrap_or(time);
64+
let time = self.init_time.delta();
6965

7066
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Gauge<T>>());
7167
let mut new_agg = if s_data.is_none() {
7268
Some(data::Gauge {
7369
data_points: vec![],
74-
start_time: Some(start_time),
75-
time,
70+
start_time: Some(time.start),
71+
time: time.current,
7672
})
7773
} else {
7874
None
7975
};
8076
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
81-
s_data.start_time = Some(start_time);
82-
s_data.time = time;
77+
s_data.start_time = Some(time.start);
78+
s_data.time = time.current;
8379

8480
self.value_map
8581
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
@@ -98,22 +94,21 @@ impl<T: Number> LastValue<T> {
9894
&self,
9995
dest: Option<&mut dyn Aggregation>,
10096
) -> (usize, Option<Box<dyn Aggregation>>) {
101-
let time = SystemTime::now();
102-
let start_time = self.start.lock().map(|start| *start).unwrap_or(time);
97+
let time = self.init_time.cumulative();
10398
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Gauge<T>>());
10499
let mut new_agg = if s_data.is_none() {
105100
Some(data::Gauge {
106101
data_points: vec![],
107-
start_time: Some(start_time),
108-
time,
102+
start_time: Some(time.start),
103+
time: time.current,
109104
})
110105
} else {
111106
None
112107
};
113108
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
114109

115-
s_data.start_time = Some(start_time);
116-
s_data.time = time;
110+
s_data.start_time = Some(time.start);
111+
s_data.time = time.current;
117112

118113
self.value_map
119114
.collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {

0 commit comments

Comments
 (0)