From c105ed65607aed9a28b175b4e4ecf5981d810190 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Wed, 11 Dec 2024 07:42:52 +0200 Subject: [PATCH] Move time from DataPoint to Histogram/ExpoHistogram --- opentelemetry-proto/src/transform/metrics.rs | 8 +-- opentelemetry-sdk/src/metrics/data/mod.rs | 26 +++------ .../src/metrics/internal/aggregate.rs | 8 +-- .../metrics/internal/exponential_histogram.rs | 56 ++++++++++--------- .../src/metrics/internal/histogram.rs | 39 +++++++------ .../src/testing/metrics/in_memory_exporter.rs | 6 ++ opentelemetry-stdout/src/metrics/exporter.rs | 24 ++++---- 7 files changed, 85 insertions(+), 82 deletions(-) diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index 04a1fe18f5..cb135ebf83 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -230,8 +230,8 @@ pub mod tonic { .iter() .map(|dp| TonicHistogramDataPoint { attributes: dp.attributes.iter().map(Into::into).collect(), - start_time_unix_nano: to_nanos(dp.start_time), - time_unix_nano: to_nanos(dp.time), + start_time_unix_nano: to_nanos(hist.start_time), + time_unix_nano: to_nanos(hist.time), count: dp.count, sum: Some(dp.sum.into_f64()), bucket_counts: dp.bucket_counts.clone(), @@ -258,8 +258,8 @@ pub mod tonic { .iter() .map(|dp| TonicExponentialHistogramDataPoint { attributes: dp.attributes.iter().map(Into::into).collect(), - start_time_unix_nano: to_nanos(dp.start_time), - time_unix_nano: to_nanos(dp.time), + start_time_unix_nano: to_nanos(hist.start_time), + time_unix_nano: to_nanos(hist.time), count: dp.count as u64, sum: Some(dp.sum.into_f64()), scale: dp.scale.into(), diff --git a/opentelemetry-sdk/src/metrics/data/mod.rs b/opentelemetry-sdk/src/metrics/data/mod.rs index 9c1516d8b0..b1b198d73c 100644 --- a/opentelemetry-sdk/src/metrics/data/mod.rs +++ b/opentelemetry-sdk/src/metrics/data/mod.rs @@ -147,6 +147,10 @@ impl Aggregation for Sum { pub struct Histogram { /// Individual aggregated measurements with unique attributes. pub data_points: Vec>, + /// The time when the time series was started. + pub start_time: SystemTime, + /// The time when the time series was recorded. + pub time: SystemTime, /// Describes if the aggregation is reported as the change from the last report /// time, or the cumulative changes since a fixed start time. pub temporality: Temporality, @@ -166,11 +170,6 @@ impl Aggregation for Histogram { pub struct HistogramDataPoint { /// The set of key value pairs that uniquely identify the time series. pub attributes: Vec, - /// The time when the time series was started. - pub start_time: SystemTime, - /// The time when the time series was recorded. - pub time: SystemTime, - /// The number of updates this histogram has been calculated with. pub count: u64, /// The upper bounds of the buckets of the histogram. @@ -195,8 +194,6 @@ impl Clone for HistogramDataPoint { fn clone(&self) -> Self { Self { attributes: self.attributes.clone(), - start_time: self.start_time, - time: self.time, count: self.count, bounds: self.bounds.clone(), bucket_counts: self.bucket_counts.clone(), @@ -213,7 +210,10 @@ impl Clone for HistogramDataPoint { pub struct ExponentialHistogram { /// The individual aggregated measurements with unique attributes. pub data_points: Vec>, - + /// When the time series was started. + pub start_time: SystemTime, + /// The time when the time series was recorded. + pub time: SystemTime, /// Describes if the aggregation is reported as the change from the last report /// time, or the cumulative changes since a fixed start time. pub temporality: Temporality, @@ -233,10 +233,6 @@ impl Aggregation for ExponentialHistogram pub struct ExponentialHistogramDataPoint { /// The set of key value pairs that uniquely identify the time series. pub attributes: Vec, - /// When the time series was started. - pub start_time: SystemTime, - /// The time when the time series was recorded. - pub time: SystemTime, /// The number of updates this histogram has been calculated with. pub count: usize, @@ -281,8 +277,6 @@ impl Clone for ExponentialHistogramDataPoint { fn clone(&self) -> Self { Self { attributes: self.attributes.clone(), - start_time: self.start_time, - time: self.time, count: self.count, min: self.min, max: self.max, @@ -375,8 +369,6 @@ mod tests { let histogram_data_point = HistogramDataPoint { attributes: vec![KeyValue::new("key", "value")], - start_time: std::time::SystemTime::now(), - time: std::time::SystemTime::now(), count: 0, bounds: vec![], bucket_counts: vec![], @@ -395,8 +387,6 @@ mod tests { let exponential_histogram_data_point = ExponentialHistogramDataPoint { attributes: vec![KeyValue::new("key", "value")], - start_time: std::time::SystemTime::now(), - time: std::time::SystemTime::now(), count: 0, min: None, max: None, diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 71befc8d6e..a4ae26204b 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -314,8 +314,6 @@ mod tests { let mut a = Histogram { data_points: vec![HistogramDataPoint { attributes: vec![KeyValue::new("a1", 1)], - start_time: SystemTime::now(), - time: SystemTime::now(), count: 2, bounds: vec![1.0, 2.0], bucket_counts: vec![0, 1, 1], @@ -324,6 +322,8 @@ mod tests { sum: 3u64, exemplars: vec![], }], + start_time: SystemTime::now(), + time: SystemTime::now(), temporality: if temporality == Temporality::Delta { Temporality::Cumulative } else { @@ -357,8 +357,6 @@ mod tests { let mut a = ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { attributes: vec![KeyValue::new("a1", 1)], - start_time: SystemTime::now(), - time: SystemTime::now(), count: 2, min: None, max: None, @@ -376,6 +374,8 @@ mod tests { zero_threshold: 1.0, exemplars: vec![], }], + start_time: SystemTime::now(), + time: SystemTime::now(), temporality: if temporality == Temporality::Delta { Temporality::Cumulative } else { diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index af89dc227a..8e6901310d 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -389,12 +389,19 @@ impl ExpoHistogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let t = SystemTime::now(); + let time = SystemTime::now(); + let start_time = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), time)) + .unwrap_or(time); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::ExponentialHistogram { data_points: vec![], + start_time, + time, temporality: Temporality::Delta, }) } else { @@ -402,20 +409,14 @@ impl ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - - let prev_start = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), t)) - .unwrap_or(t); + h.start_time = start_time; + h.time = time; self.value_map .collect_and_reset(&mut h.data_points, |attributes, attr| { let b = attr.into_inner().unwrap_or_else(|err| err.into_inner()); data::ExponentialHistogramDataPoint { attributes, - start_time: prev_start, - time: t, count: b.count, min: if self.record_min_max { Some(b.min) @@ -450,12 +451,19 @@ impl ExpoHistogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let t = SystemTime::now(); + let time = SystemTime::now(); + let start_time = self + .start + .lock() + .map(|s| *s) + .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::ExponentialHistogram { data_points: vec![], + start_time, + time, temporality: Temporality::Cumulative, }) } else { @@ -463,20 +471,14 @@ impl ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - - let prev_start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); + h.start_time = start_time; + h.time = time; self.value_map .collect_readonly(&mut h.data_points, |attributes, attr| { let b = attr.lock().unwrap_or_else(|err| err.into_inner()); data::ExponentialHistogramDataPoint { attributes, - start_time: prev_start, - time: t, count: b.count, min: if self.record_min_max { Some(b.min) @@ -1270,8 +1272,6 @@ mod tests { min: Some(1.into()), max: Some(16.into()), sum: 31.into(), - start_time: SystemTime::now(), - time: SystemTime::now(), scale: -1, positive_bucket: data::ExponentialBucket { offset: -1, @@ -1285,6 +1285,8 @@ mod tests { zero_threshold: 0.0, zero_count: 0, }], + start_time: SystemTime::now(), + time: SystemTime::now(), }, want_count: 1, }, @@ -1318,8 +1320,6 @@ mod tests { offset: -1, counts: vec![1, 4, 1], }, - start_time: SystemTime::now(), - time: SystemTime::now(), negative_bucket: data::ExponentialBucket { offset: 0, counts: vec![], @@ -1328,6 +1328,8 @@ mod tests { zero_threshold: 0.0, zero_count: 0, }], + start_time: SystemTime::now(), + time: SystemTime::now(), }, want_count: 1, }, @@ -1364,8 +1366,6 @@ mod tests { offset: -1, counts: vec![1, 4, 1], }, - start_time: SystemTime::now(), - time: SystemTime::now(), negative_bucket: data::ExponentialBucket { offset: 0, counts: vec![], @@ -1374,6 +1374,8 @@ mod tests { zero_threshold: 0.0, zero_count: 0, }], + start_time: SystemTime::now(), + time: SystemTime::now(), }, want_count: 1, }, @@ -1410,8 +1412,6 @@ mod tests { counts: vec![1, 6, 2], }, attributes: vec![], - start_time: SystemTime::now(), - time: SystemTime::now(), negative_bucket: data::ExponentialBucket { offset: 0, counts: vec![], @@ -1420,6 +1420,8 @@ mod tests { zero_threshold: 0.0, zero_count: 0, }], + start_time: SystemTime::now(), + time: SystemTime::now(), }, want_count: 1, }, @@ -1430,6 +1432,8 @@ mod tests { let mut got: Box = Box::new(data::ExponentialHistogram:: { data_points: vec![], + start_time: SystemTime::now(), + time: SystemTime::now(), temporality: Temporality::Delta, }); let mut count = 0; diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index ac4a6806b7..fc673297a9 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -109,11 +109,18 @@ impl Histogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let t = SystemTime::now(); + let time = SystemTime::now(); + let start_time = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), time)) + .unwrap_or(time); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { data_points: vec![], + start_time, + time, temporality: Temporality::Delta, }) } else { @@ -121,20 +128,14 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - - let prev_start = self - .start - .lock() - .map(|mut start| replace(start.deref_mut(), t)) - .unwrap_or(t); + h.start_time = start_time; + h.time = time; self.value_map .collect_and_reset(&mut h.data_points, |attributes, aggr| { let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner()); HistogramDataPoint { attributes, - start_time: prev_start, - time: t, count: b.count, bounds: self.bounds.clone(), bucket_counts: b.counts, @@ -164,11 +165,19 @@ impl Histogram { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let t = SystemTime::now(); + let time = SystemTime::now(); + let start_time = self + .start + .lock() + .map(|s| *s) + .unwrap_or_else(|_| SystemTime::now()); + let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { data_points: vec![], + start_time, + time, temporality: Temporality::Cumulative, }) } else { @@ -176,20 +185,14 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - - let prev_start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); + h.start_time = start_time; + h.time = time; self.value_map .collect_readonly(&mut h.data_points, |attributes, aggr| { let b = aggr.lock().unwrap_or_else(|err| err.into_inner()); HistogramDataPoint { attributes, - start_time: prev_start, - time: t, count: b.count, bounds: self.bounds.clone(), bucket_counts: b.counts.clone(), diff --git a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs index c8972ef9be..29fb3e59fc 100644 --- a/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs @@ -195,16 +195,22 @@ impl InMemoryMetricExporter { if let Some(hist) = data.as_any().downcast_ref::>() { Some(Box::new(Histogram { data_points: hist.data_points.clone(), + start_time: hist.start_time, + time: hist.time, temporality: hist.temporality, })) } else if let Some(hist) = data.as_any().downcast_ref::>() { Some(Box::new(Histogram { data_points: hist.data_points.clone(), + start_time: hist.start_time, + time: hist.time, temporality: hist.temporality, })) } else if let Some(hist) = data.as_any().downcast_ref::>() { Some(Box::new(Histogram { data_points: hist.data_points.clone(), + start_time: hist.start_time, + time: hist.time, temporality: hist.temporality, })) } else if let Some(sum) = data.as_any().downcast_ref::>() { diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 43fbda0166..54feb33c41 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -160,13 +160,13 @@ fn print_gauge(gauge: &data::Gauge) { if let Some(start_time) = gauge.start_time { let datetime: DateTime = start_time.into(); println!( - "\t\t\tStartTime : {}", + "\t\tStartTime : {}", datetime.format("%Y-%m-%d %H:%M:%S%.6f") ); } let datetime: DateTime = gauge.time.into(); println!( - "\t\t\tEndTime : {}", + "\t\tEndTime : {}", datetime.format("%Y-%m-%d %H:%M:%S%.6f") ); print_gauge_data_points(&gauge.data_points); @@ -178,6 +178,16 @@ fn print_histogram(histogram: &data::Histogram) { } else { println!("\t\tTemporality : Delta"); } + let datetime: DateTime = histogram.start_time.into(); + println!( + "\t\tStartTime : {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); + let datetime: DateTime = histogram.time.into(); + println!( + "\t\tEndTime : {}", + datetime.format("%Y-%m-%d %H:%M:%S%.6f") + ); println!("\t\tHistogram DataPoints"); print_hist_data_points(&histogram.data_points); } @@ -207,16 +217,6 @@ fn print_gauge_data_points(data_points: &[data::GaugeDataPoint]) { fn print_hist_data_points(data_points: &[data::HistogramDataPoint]) { for (i, data_point) in data_points.iter().enumerate() { println!("\t\tDataPoint #{}", i); - let datetime: DateTime = data_point.start_time.into(); - println!( - "\t\t\tStartTime : {}", - datetime.format("%Y-%m-%d %H:%M:%S%.6f") - ); - let datetime: DateTime = data_point.time.into(); - println!( - "\t\t\tEndTime : {}", - datetime.format("%Y-%m-%d %H:%M:%S%.6f") - ); println!("\t\t\tCount : {}", data_point.count); println!("\t\t\tSum : {:?}", data_point.sum); if let Some(min) = &data_point.min {