Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{marker, sync::Arc};
use std::{
marker,
mem::replace,
ops::DerefMut,
sync::{Arc, Mutex},
time::SystemTime,
};

use opentelemetry::KeyValue;

Expand Down Expand Up @@ -53,6 +59,44 @@ where
}
}

pub(crate) struct AggregateTime {
pub start: SystemTime,
pub current: SystemTime,
}

/// Initialized [`AggregateTime`] for specific [`Temporality`]
pub(crate) struct AggregateTimeInitiator(Mutex<SystemTime>);

impl AggregateTimeInitiator {
pub(crate) fn delta(&self) -> AggregateTime {
let current_time = SystemTime::now();
let start_time = self
.0
.lock()
.map(|mut start| replace(start.deref_mut(), current_time))
.unwrap_or(current_time);
AggregateTime {
start: start_time,
current: current_time,
}
}

pub(crate) fn cumulative(&self) -> AggregateTime {
let current_time = SystemTime::now();
let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time);
AggregateTime {
start: start_time,
current: current_time,
}
}
}

impl Default for AggregateTimeInitiator {
fn default() -> Self {
Self(Mutex::new(SystemTime::now()))
}
}

/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
Expand Down
40 changes: 15 additions & 25 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};

use opentelemetry::{otel_debug, KeyValue};
use std::sync::OnceLock;
Expand All @@ -8,7 +8,7 @@ use crate::metrics::{
Temporality,
};

use super::{Aggregator, Number, ValueMap};
use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap};

pub(crate) const EXPO_MAX_SCALE: i8 = 20;
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
Expand Down Expand Up @@ -350,7 +350,7 @@ struct BucketConfig {
/// measurements were made in.
pub(crate) struct ExpoHistogram<T: Number> {
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
start: Mutex<SystemTime>,
init_time: AggregateTimeInitiator,
record_sum: bool,
record_min_max: bool,
}
Expand All @@ -370,7 +370,7 @@ impl<T: Number> ExpoHistogram<T> {
}),
record_sum,
record_min_max,
start: Mutex::new(SystemTime::now()),
init_time: AggregateTimeInitiator::default(),
}
}

Expand All @@ -389,28 +389,23 @@ impl<T: Number> ExpoHistogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time,
time,
start_time: time.start,
time: time.current,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.start_time = start_time;
h.time = time;
h.start_time = time.start;
h.time = time.current;

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, attr| {
Expand Down Expand Up @@ -451,28 +446,23 @@ impl<T: Number> ExpoHistogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let time = self.init_time.cumulative();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time,
time,
start_time: time.start,
time: time.current,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.start_time = start_time;
h.time = time;
h.start_time = time.start;
h.time = time.current;

self.value_map
.collect_readonly(&mut h.data_points, |attributes, attr| {
Expand Down Expand Up @@ -512,7 +502,7 @@ impl<T: Number> ExpoHistogram<T> {

#[cfg(test)]
mod tests {
use std::ops::Neg;
use std::{ops::Neg, time::SystemTime};

use crate::metrics::internal::{self, AggregateBuilder};

Expand Down
38 changes: 14 additions & 24 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::mem::replace;
use std::ops::DerefMut;
use std::{sync::Mutex, time::SystemTime};
use std::sync::Mutex;

use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::{self, Aggregation};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

use super::aggregate::AggregateTimeInitiator;
use super::ValueMap;
use super::{Aggregator, Number};

Expand Down Expand Up @@ -70,7 +71,7 @@ pub(crate) struct Histogram<T: Number> {
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
start: Mutex<SystemTime>,
init_time: AggregateTimeInitiator,
}

impl<T: Number> Histogram<T> {
Expand All @@ -89,7 +90,7 @@ impl<T: Number> Histogram<T> {
bounds,
record_min_max,
record_sum,
start: Mutex::new(SystemTime::now()),
init_time: AggregateTimeInitiator::default(),
}
}

Expand All @@ -109,27 +110,22 @@ impl<T: Number> Histogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let time = self.init_time.delta();
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time,
time,
start_time: time.start,
time: time.current,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.start_time = start_time;
h.time = time;
h.start_time = time.start;
h.time = time.current;

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
Expand Down Expand Up @@ -165,28 +161,22 @@ impl<T: Number> Histogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

let time = self.init_time.cumulative();
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time,
time,
start_time: time.start,
time: time.current,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.start_time = start_time;
h.time = time;
h.start_time = time.start;
h.time = time.current;

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
Expand Down
37 changes: 16 additions & 21 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};

use crate::metrics::data::{self, Aggregation, GaugeDataPoint};
use opentelemetry::KeyValue;

use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
use super::{
aggregate::AggregateTimeInitiator, Aggregator, AtomicTracker, AtomicallyUpdate, Number,
ValueMap,
};

/// this is reused by PrecomputedSum
pub(crate) struct Assign<T>
Expand Down Expand Up @@ -40,14 +41,14 @@ where
/// Summarizes a set of measurements as the last one made.
pub(crate) struct LastValue<T: Number> {
value_map: ValueMap<Assign<T>>,
start: Mutex<SystemTime>,
init_time: AggregateTimeInitiator,
}

impl<T: Number> LastValue<T> {
pub(crate) fn new() -> Self {
LastValue {
value_map: ValueMap::new(()),
start: Mutex::new(SystemTime::now()),
init_time: AggregateTimeInitiator::default(),
}
}

Expand All @@ -60,26 +61,21 @@ impl<T: Number> LastValue<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let time = self.init_time.delta();

let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Gauge<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Gauge {
data_points: vec![],
start_time: Some(start_time),
time,
start_time: Some(time.start),
time: time.current,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = Some(start_time);
s_data.time = time;
s_data.start_time = Some(time.start);
s_data.time = time.current;

self.value_map
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
Expand All @@ -98,22 +94,21 @@ impl<T: Number> LastValue<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self.start.lock().map(|start| *start).unwrap_or(time);
let time = self.init_time.cumulative();
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Gauge<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Gauge {
data_points: vec![],
start_time: Some(start_time),
time,
start_time: Some(time.start),
time: time.current,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));

s_data.start_time = Some(start_time);
s_data.time = time;
s_data.start_time = Some(time.start);
s_data.time = time.current;

self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
Expand Down
Loading
Loading