diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index a4ae26204b..edfc50d9d1 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -1,4 +1,10 @@ -use std::{marker, sync::Arc}; +use std::{ + marker, + mem::replace, + ops::DerefMut, + sync::{Arc, Mutex}, + time::SystemTime, +}; use opentelemetry::KeyValue; @@ -53,6 +59,44 @@ where } } +pub(crate) struct AggregateTime { + pub start: SystemTime, + pub current: SystemTime, +} + +/// Initialized [`AggregateTime`] for specific [`Temporality`] +pub(crate) struct AggregateTimeInitiator(Mutex); + +impl AggregateTimeInitiator { + pub(crate) fn delta(&self) -> AggregateTime { + let current_time = SystemTime::now(); + let start_time = self + .0 + .lock() + .map(|mut start| replace(start.deref_mut(), current_time)) + .unwrap_or(current_time); + AggregateTime { + start: start_time, + current: current_time, + } + } + + pub(crate) fn cumulative(&self) -> AggregateTime { + let current_time = SystemTime::now(); + let start_time = self.0.lock().map(|start| *start).unwrap_or(current_time); + AggregateTime { + start: start_time, + current: current_time, + } + } +} + +impl Default for AggregateTimeInitiator { + fn default() -> Self { + Self(Mutex::new(SystemTime::now())) + } +} + /// Builds aggregate functions pub(crate) struct AggregateBuilder { /// The temporality used for the returned aggregate functions. diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 8e6901310d..42f3794ad1 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,4 +1,4 @@ -use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; +use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex}; use opentelemetry::{otel_debug, KeyValue}; use std::sync::OnceLock; @@ -8,7 +8,7 @@ use crate::metrics::{ Temporality, }; -use super::{Aggregator, Number, ValueMap}; +use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap}; pub(crate) const EXPO_MAX_SCALE: i8 = 20; pub(crate) const EXPO_MIN_SCALE: i8 = -10; @@ -350,7 +350,7 @@ struct BucketConfig { /// measurements were made in. pub(crate) struct ExpoHistogram { value_map: ValueMap>>, - start: Mutex, + init_time: AggregateTimeInitiator, record_sum: bool, record_min_max: bool, } @@ -370,7 +370,7 @@ impl ExpoHistogram { }), record_sum, record_min_max, - start: Mutex::new(SystemTime::now()), + init_time: AggregateTimeInitiator::default(), } } @@ -389,19 +389,14 @@ impl ExpoHistogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), time)) - .unwrap_or(time); + let time = self.init_time.delta(); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::ExponentialHistogram { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Delta, }) } else { @@ -409,8 +404,8 @@ impl ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.start_time = start_time; - h.time = time; + h.start_time = time.start; + h.time = time.current; self.value_map .collect_and_reset(&mut h.data_points, |attributes, attr| { @@ -451,19 +446,14 @@ impl ExpoHistogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); + let time = self.init_time.cumulative(); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::ExponentialHistogram { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Cumulative, }) } else { @@ -471,8 +461,8 @@ impl ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - h.start_time = start_time; - h.time = time; + h.start_time = time.start; + h.time = time.current; self.value_map .collect_readonly(&mut h.data_points, |attributes, attr| { @@ -512,7 +502,7 @@ impl ExpoHistogram { #[cfg(test)] mod tests { - use std::ops::Neg; + use std::{ops::Neg, time::SystemTime}; use crate::metrics::internal::{self, AggregateBuilder}; diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index fc673297a9..f535566ecf 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,12 +1,13 @@ use std::mem::replace; use std::ops::DerefMut; -use std::{sync::Mutex, time::SystemTime}; +use std::sync::Mutex; use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation}; use crate::metrics::Temporality; use opentelemetry::KeyValue; +use super::aggregate::AggregateTimeInitiator; use super::ValueMap; use super::{Aggregator, Number}; @@ -70,7 +71,7 @@ pub(crate) struct Histogram { bounds: Vec, record_min_max: bool, record_sum: bool, - start: Mutex, + init_time: AggregateTimeInitiator, } impl Histogram { @@ -89,7 +90,7 @@ impl Histogram { bounds, record_min_max, record_sum, - start: Mutex::new(SystemTime::now()), + init_time: AggregateTimeInitiator::default(), } } @@ -109,18 +110,13 @@ impl Histogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), time)) - .unwrap_or(time); + let time = self.init_time.delta(); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Delta, }) } else { @@ -128,8 +124,8 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.start_time = start_time; - h.time = time; + h.start_time = time.start; + h.time = time.current; self.value_map .collect_and_reset(&mut h.data_points, |attributes, aggr| { @@ -165,19 +161,13 @@ impl Histogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); - + let time = self.init_time.cumulative(); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Cumulative, }) } else { @@ -185,8 +175,8 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - h.start_time = start_time; - h.time = time; + h.start_time = time.start; + h.time = time.current; self.value_map .collect_readonly(&mut h.data_points, |attributes, aggr| { diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index 44b7b23396..b140fdbc40 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,9 +1,10 @@ -use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; - use crate::metrics::data::{self, Aggregation, GaugeDataPoint}; use opentelemetry::KeyValue; -use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap}; +use super::{ + aggregate::AggregateTimeInitiator, Aggregator, AtomicTracker, AtomicallyUpdate, Number, + ValueMap, +}; /// this is reused by PrecomputedSum pub(crate) struct Assign @@ -40,14 +41,14 @@ where /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue { value_map: ValueMap>, - start: Mutex, + init_time: AggregateTimeInitiator, } impl LastValue { pub(crate) fn new() -> Self { LastValue { value_map: ValueMap::new(()), - start: Mutex::new(SystemTime::now()), + init_time: AggregateTimeInitiator::default(), } } @@ -60,26 +61,21 @@ impl LastValue { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), time)) - .unwrap_or(time); + let time = self.init_time.delta(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { Some(data::Gauge { data_points: vec![], - start_time: Some(start_time), - time, + start_time: Some(time.start), + time: time.current, }) } else { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = Some(start_time); - s_data.time = time; + s_data.start_time = Some(time.start); + s_data.time = time.current; self.value_map .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint { @@ -98,22 +94,21 @@ impl LastValue { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self.start.lock().map(|start| *start).unwrap_or(time); + let time = self.init_time.cumulative(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { Some(data::Gauge { data_points: vec![], - start_time: Some(start_time), - time, + start_time: Some(time.start), + time: time.current, }) } else { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = Some(start_time); - s_data.time = time; + s_data.start_time = Some(time.start); + s_data.time = time.current; self.value_map .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index a37bfdd25e..7b67147011 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -3,14 +3,15 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; +use super::aggregate::AggregateTimeInitiator; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; -use std::{collections::HashMap, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; +use std::{collections::HashMap, sync::Mutex}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { value_map: ValueMap>, monotonic: bool, - start: Mutex, + init_time: AggregateTimeInitiator, reported: Mutex, T>>, } @@ -19,7 +20,7 @@ impl PrecomputedSum { PrecomputedSum { value_map: ValueMap::new(()), monotonic, - start: Mutex::new(SystemTime::now()), + init_time: AggregateTimeInitiator::default(), reported: Mutex::new(Default::default()), } } @@ -33,19 +34,14 @@ impl PrecomputedSum { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), time)) - .unwrap_or(time); + let time = self.init_time.delta(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { Some(data::Sum { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Delta, is_monotonic: self.monotonic, }) @@ -53,8 +49,8 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = start_time; - s_data.time = time; + s_data.start_time = time.start; + s_data.time = time.current; s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; @@ -89,15 +85,14 @@ impl PrecomputedSum { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self.start.lock().map(|start| *start).unwrap_or(time); + let time = self.init_time.cumulative(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { Some(data::Sum { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Cumulative, is_monotonic: self.monotonic, }) @@ -105,8 +100,8 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = start_time; - s_data.time = time; + s_data.start_time = time.start; + s_data.time = time.current; s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 7666b6e448..5f51be79c2 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,12 +1,10 @@ -use std::mem::replace; -use std::ops::DerefMut; use std::vec; -use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, SumDataPoint}; use crate::metrics::Temporality; use opentelemetry::KeyValue; +use super::aggregate::AggregateTimeInitiator; use super::{Aggregator, AtomicTracker, Number}; use super::{AtomicallyUpdate, ValueMap}; @@ -45,7 +43,7 @@ where pub(crate) struct Sum { value_map: ValueMap>, monotonic: bool, - start: Mutex, + init_time: AggregateTimeInitiator, } impl Sum { @@ -58,7 +56,7 @@ impl Sum { Sum { value_map: ValueMap::new(()), monotonic, - start: Mutex::new(SystemTime::now()), + init_time: AggregateTimeInitiator::default(), } } @@ -71,19 +69,13 @@ impl Sum { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), time)) - .unwrap_or(time); - + let time = self.init_time.delta(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { Some(data::Sum { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Delta, is_monotonic: self.monotonic, }) @@ -91,8 +83,8 @@ impl Sum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = start_time; - s_data.time = time; + s_data.start_time = time.start; + s_data.time = time.current; s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; @@ -113,14 +105,13 @@ impl Sum { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let time = SystemTime::now(); - let start_time = self.start.lock().map(|start| *start).unwrap_or(time); + let time = self.init_time.cumulative(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { Some(data::Sum { data_points: vec![], - start_time, - time, + start_time: time.start, + time: time.current, temporality: Temporality::Cumulative, is_monotonic: self.monotonic, }) @@ -129,8 +120,8 @@ impl Sum { }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.start_time = start_time; - s_data.time = time; + s_data.start_time = time.start; + s_data.time = time.current; s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic;