diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index e5e4ca3295..7c439c830f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -5,6 +5,9 @@ - Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning. - **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]: - `id_generator`, `should_sample` +- **Breaking** Removed `Default` and `Clone` implementation from `InMemoryMetricExporter`. +- **Breaking** `InMemoryMetricExporterBuilder` requires mandatory `metrics` field to be set via + `.with_metrics` method. [3227]: https://github.com/open-telemetry/opentelemetry-rust/pull/3227 diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index 903c2a46ec..ae83743942 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -4,19 +4,20 @@ use crate::metrics::data::{ }; use crate::metrics::exporter::PushMetricExporter; use crate::metrics::Temporality; -use crate::InMemoryExporterError; -use std::collections::VecDeque; use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; use super::data::{AggregatedMetrics, Metric, ScopeMetrics}; +// Not a user-facing type, just a type alias for clarity within this module. +type InMemoryMetrics = Vec; + /// An in-memory metrics exporter that stores metrics data in memory. /// /// This exporter is useful for testing and debugging purposes. It stores -/// metric data in a `VecDeque`. Metrics can be retrieved -/// using the `get_finished_metrics` method. +/// metric data in a user-provided `Vec`, from which the +/// exported data can be retrieved as well. /// /// # Panics /// @@ -27,6 +28,7 @@ use super::data::{AggregatedMetrics, Metric, ScopeMetrics}; /// # Example /// /// ``` +///# use std::sync::{Arc, Mutex}; ///# use opentelemetry_sdk::metrics; ///# use opentelemetry::{KeyValue}; ///# use opentelemetry::metrics::MeterProvider; @@ -35,12 +37,15 @@ use super::data::{AggregatedMetrics, Metric, ScopeMetrics}; /// ///# #[tokio::main] ///# async fn main() { -/// // Create an InMemoryMetricExporter -/// let exporter = InMemoryMetricExporter::default(); +/// // Create an InMemoryMetricExporter +/// let metrics = Arc::new(Mutex::new(Vec::new())); +/// let exporter = InMemoryMetricExporter::builder() +/// .with_metrics(metrics.clone()) +/// .build(); /// /// // Create a MeterProvider and register the exporter /// let meter_provider = metrics::SdkMeterProvider::builder() -/// .with_reader(PeriodicReader::builder(exporter.clone()).build()) +/// .with_reader(PeriodicReader::builder(exporter).build()) /// .build(); /// /// // Create and record metrics using the MeterProvider @@ -50,26 +55,21 @@ use super::data::{AggregatedMetrics, Metric, ScopeMetrics}; /// /// meter_provider.force_flush().unwrap(); /// -/// // Retrieve the finished metrics from the exporter -/// let finished_metrics = exporter.get_finished_metrics().unwrap(); -/// /// // Print the finished metrics -/// for resource_metrics in finished_metrics { +/// for resource_metrics in metrics.lock().unwrap().iter() { /// println!("{:?}", resource_metrics); /// } ///# } /// ``` pub struct InMemoryMetricExporter { - metrics: Arc>>, + metrics: Arc>, temporality: Temporality, } -impl Clone for InMemoryMetricExporter { - fn clone(&self) -> Self { - InMemoryMetricExporter { - metrics: self.metrics.clone(), - temporality: self.temporality, - } +impl InMemoryMetricExporter { + /// Creates a new instance of the [`InMemoryMetricExporterBuilder`]. + pub fn builder() -> InMemoryMetricExporterBuilder { + InMemoryMetricExporterBuilder::new() } } @@ -79,22 +79,20 @@ impl fmt::Debug for InMemoryMetricExporter { } } -impl Default for InMemoryMetricExporter { - fn default() -> Self { - InMemoryMetricExporterBuilder::new().build() - } -} - /// Builder for [`InMemoryMetricExporter`]. /// # Example /// /// ``` -/// # use opentelemetry_sdk::metrics::{InMemoryMetricExporter, InMemoryMetricExporterBuilder}; +///# use opentelemetry_sdk::metrics::{InMemoryMetricExporter, InMemoryMetricExporterBuilder}; +///# use std::sync::{Arc, Mutex}; /// -/// let exporter = InMemoryMetricExporterBuilder::new().build(); +/// let metrics = Arc::new(Mutex::new(Vec::new())); +/// let exporter = InMemoryMetricExporterBuilder::new() +/// .with_metrics(metrics.clone()).build(); /// ``` pub struct InMemoryMetricExporterBuilder { temporality: Option, + metrics: Option>>, } impl fmt::Debug for InMemoryMetricExporterBuilder { @@ -112,7 +110,10 @@ impl Default for InMemoryMetricExporterBuilder { impl InMemoryMetricExporterBuilder { /// Creates a new instance of the `InMemoryMetricExporterBuilder`. pub fn new() -> Self { - Self { temporality: None } + Self { + temporality: None, + metrics: None, + } } /// Set the [Temporality] of the exporter. @@ -121,48 +122,34 @@ impl InMemoryMetricExporterBuilder { self } + /// Set the internal collection to store the metrics. + pub fn with_metrics(mut self, metrics: Arc>) -> Self { + self.metrics = Some(metrics); + self + } + /// Creates a new instance of the `InMemoryMetricExporter`. /// pub fn build(self) -> InMemoryMetricExporter { InMemoryMetricExporter { - metrics: Arc::new(Mutex::new(VecDeque::new())), + metrics: self.metrics.expect("Metrics must be provided"), temporality: self.temporality.unwrap_or_default(), } } } impl InMemoryMetricExporter { - /// Returns the finished metrics as a vector of `ResourceMetrics`. - /// - /// # Errors - /// - /// Returns a `MetricError` if the internal lock cannot be acquired. - /// - /// # Example - /// - /// ``` - /// # use opentelemetry_sdk::metrics::InMemoryMetricExporter; - /// - /// let exporter = InMemoryMetricExporter::default(); - /// let finished_metrics = exporter.get_finished_metrics().unwrap(); - /// ``` - pub fn get_finished_metrics(&self) -> Result, InMemoryExporterError> { - let metrics = self - .metrics - .lock() - .map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect()) - .map_err(InMemoryExporterError::from)?; - Ok(metrics) - } - /// Clears the internal storage of finished metrics. /// /// # Example /// /// ``` - /// # use opentelemetry_sdk::metrics::InMemoryMetricExporter; + /// use opentelemetry_sdk::metrics::InMemoryMetricExporter; + /// use std::sync::{Arc, Mutex}; /// - /// let exporter = InMemoryMetricExporter::default(); + /// let metrics = Arc::new(Mutex::new(Vec::new())); + /// let exporter = InMemoryMetricExporter::builder() + /// .with_metrics(metrics.clone()).build(); /// exporter.reset(); /// ``` pub fn reset(&self) { @@ -241,7 +228,7 @@ impl PushMetricExporter for InMemoryMetricExporter { self.metrics .lock() .map(|mut metrics_guard| { - metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics)) + metrics_guard.push(InMemoryMetricExporter::clone_metrics(metrics)) }) .map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string())) } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index fa1375bd1f..aa22b8829d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -271,11 +271,7 @@ mod tests { test_context.flush_metrics(); // As instrument name are valid because of the feature flag, metrics should be exported - let resource_metrics = test_context - .exporter - .get_finished_metrics() - .expect("metrics expected to be exported"); - + let resource_metrics = test_context.resource_metrics.lock().unwrap(); assert!(!resource_metrics.is_empty(), "metrics should be exported"); } @@ -327,21 +323,23 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); - assert!(sum.is_monotonic, "Should produce monotonic."); - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); - let data_point = &sum.data_points[0]; - assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); - assert_eq!(data_point.value, 50, "Unexpected data point value"); + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -352,17 +350,19 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); - assert!(sum.is_monotonic, "Should produce monotonic."); - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - let data_point = &sum.data_points[0]; - assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); - assert_eq!(data_point.value, 50, "Unexpected data point value"); + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -564,43 +564,44 @@ mod tests { for (iter, v) in values_clone.iter().enumerate() { test_context.flush_metrics(); - let MetricData::Sum(sum) = - test_context.get_aggregation::("my_observable_counter", None) - else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 1); - assert!(sum.is_monotonic, "Counter should produce monotonic."); - if let Temporality::Cumulative = temporality { - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - } + test_context.with_aggregation::("my_observable_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 1); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + } - // find and validate datapoint - let data_point = if is_empty_attributes { - &sum.data_points[0] - } else { - find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected") - }; + // find and validate datapoint + let data_point = if is_empty_attributes { + &sum.data_points[0] + } else { + find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected") + }; - if let Temporality::Cumulative = temporality { - // Cumulative counter should have the value as is. - assert_eq!(data_point.value, *v); - } else { - // Delta counter should have the increment value. - // Except for the first value which should be the start value. - if iter == 0 { - assert_eq!(data_point.value, start); + if let Temporality::Cumulative = temporality { + // Cumulative counter should have the value as is. + assert_eq!(data_point.value, *v); } else { - assert_eq!(data_point.value, increment); + // Delta counter should have the increment value. + // Except for the first value which should be the start value. + if iter == 0 { + assert_eq!(data_point.value, start); + } else { + assert_eq!(data_point.value, increment); + } } - } + }); test_context.reset_metrics(); } @@ -611,7 +612,7 @@ mod tests { async fn meter_name_retained_helper( meter: Meter, provider: SdkMeterProvider, - exporter: InMemoryMetricExporter, + metrics: Arc>>, ) { // Act let counter = meter.u64_counter("my_counter").build(); @@ -620,37 +621,43 @@ mod tests { provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); - assert!( - resource_metrics[0].scope_metrics[0].metrics.len() == 1, + let resource_metrics = metrics.lock().unwrap(); + assert_eq!( + resource_metrics[0].scope_metrics[0].metrics.len(), + 1, "There should be a single metric" ); let meter_name = resource_metrics[0].scope_metrics[0].scope.name(); assert_eq!(meter_name, ""); } - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .build(); // Test Meter creation in 2 ways, both with empty string as meter name let meter1 = meter_provider.meter(""); - meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await; + meter_name_retained_helper(meter1, meter_provider.clone(), metrics.clone()).await; let meter_scope = InstrumentationScope::builder("").build(); let meter2 = meter_provider.meter_with_scope(meter_scope); - meter_name_retained_helper(meter2, meter_provider, exporter).await; + meter_name_retained_helper(meter2, meter_provider, metrics.clone()).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_duplicate_instrument_merge() { // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .build(); // Act @@ -674,11 +681,10 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); - assert!( - resource_metrics[0].scope_metrics[0].metrics.len() == 1, + let resource_metrics = metrics.lock().unwrap(); + assert_eq!( + resource_metrics[0].scope_metrics[0].metrics.len(), + 1, "There should be single metric merging duplicate instruments" ); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; @@ -700,9 +706,13 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_duplicate_instrument_different_meter_no_merge() { // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .build(); // Act @@ -727,19 +737,20 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); - assert!( - resource_metrics[0].scope_metrics.len() == 2, + let resource_metrics = metrics.lock().unwrap(); + assert_eq!( + resource_metrics[0].scope_metrics.len(), + 2, "There should be 2 separate scope" ); - assert!( - resource_metrics[0].scope_metrics[0].metrics.len() == 1, + assert_eq!( + resource_metrics[0].scope_metrics[0].metrics.len(), + 1, "There should be single metric for the scope" ); - assert!( - resource_metrics[0].scope_metrics[1].metrics.len() == 1, + assert_eq!( + resource_metrics[0].scope_metrics[1].metrics.len(), + 1, "There should be single metric for the scope" ); @@ -791,9 +802,13 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn instrumentation_scope_identity_test() { // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .build(); // Act @@ -831,9 +846,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); println!("resource_metrics: {resource_metrics:?}"); assert!( resource_metrics[0].scope_metrics.len() == 1, @@ -878,7 +891,11 @@ mod tests { // cargo test histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist --features=testing -- --nocapture // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + let view = |i: &Instrument| { if i.name == "test_histogram" { Stream::builder() @@ -895,7 +912,7 @@ mod tests { } }; let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view) .build(); @@ -910,9 +927,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); assert!(!resource_metrics.is_empty()); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; assert_eq!( @@ -932,7 +947,11 @@ mod tests { // cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + // View drops all attributes. let view = |i: &Instrument| { if i.name == "my_observable_counter" { @@ -945,7 +964,7 @@ mod tests { } }; let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view) .build(); @@ -983,9 +1002,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); assert!(!resource_metrics.is_empty()); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; assert_eq!(metric.name, "my_observable_counter",); @@ -1012,7 +1029,11 @@ mod tests { // cargo test spatial_aggregation_when_view_drops_attributes_counter --features=testing // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + // View drops all attributes. let view = |i: &Instrument| { if i.name == "my_counter" { @@ -1027,7 +1048,7 @@ mod tests { } }; let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view) .build(); @@ -1067,9 +1088,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); assert!(!resource_metrics.is_empty()); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; assert_eq!(metric.name, "my_counter",); @@ -1097,23 +1116,23 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let MetricData::Sum(sum) = - test_context.get_aggregation::("my_counter", Some("my_unit")) - else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", Some("my_unit"), |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); - assert!(!sum.is_monotonic, "Should not produce monotonic."); - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(!sum.is_monotonic, "Should not produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); - let data_point = &sum.data_points[0]; - assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); - assert_eq!(data_point.value, 50, "Unexpected data point value"); + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1124,23 +1143,23 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let MetricData::Sum(sum) = - test_context.get_aggregation::("my_counter", Some("my_unit")) - else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", Some("my_unit"), |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); - assert!(!sum.is_monotonic, "Should not produce monotonic."); - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce Cumulative due to UpDownCounter temporality_preference" - ); + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(!sum.is_monotonic, "Should not produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce Cumulative due to UpDownCounter temporality_preference" + ); - let data_point = &sum.data_points[0]; - assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); - assert_eq!(data_point.value, 50, "Unexpected data point value"); + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1150,26 +1169,28 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let _ = test_context.get_aggregation::("my_counter", None); + test_context.with_aggregation::("my_counter", None, |_| {}); test_context.reset_metrics(); counter.add(5, &[]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); - assert!(sum.is_monotonic, "Should produce monotonic."); - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); - let data_point = &sum.data_points[0]; - assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); - assert_eq!(data_point.value, 55, "Unexpected data point value"); + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 55, "Unexpected data point value"); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1179,26 +1200,28 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let _ = test_context.get_aggregation::("my_counter", None); + test_context.with_aggregation::("my_counter", None, |_| {}); test_context.reset_metrics(); counter.add(5, &[]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); - assert!(sum.is_monotonic, "Should produce monotonic."); - assert_eq!( - sum.temporality, - Temporality::Delta, - "Should produce cumulative" - ); + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Delta, + "Should produce cumulative" + ); - let data_point = &sum.data_points[0]; - assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); - assert_eq!(data_point.value, 5, "Unexpected data point value"); + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 5, "Unexpected data point value"); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1208,21 +1231,23 @@ mod tests { counter.add(50, &[]); test_context.flush_metrics(); - let _ = test_context.get_aggregation::("my_counter", None); + test_context.with_aggregation::("my_counter", None, |_| {}); test_context.reset_metrics(); counter.add(50, &[KeyValue::new("a", "b")]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty()); + let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty()); - assert!( - no_attr_data_point.is_none(), - "Expected no data points with no attributes" - ); + assert!( + no_attr_data_point.is_none(), + "Expected no data points with no attributes" + ); + }); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1246,32 +1271,31 @@ mod tests { counter.add(1, &[KeyValue::new("key1", "value2")]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - // Expecting 2 time-series. - assert_eq!(sum.data_points.len(), 2); + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 2); - // find and validate key1=value1 datapoint - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 5); + // find and validate key1=value1 datapoint + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 5); - // find and validate key1=value2 datapoint - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 3); + // find and validate key1=value2 datapoint + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point1.value, 3); + }); - test_context.exporter.reset(); + test_context.reset_metrics(); // flush again, and validate that nothing is flushed // as delta temporality. test_context.flush_metrics(); - let resource_metrics = test_context - .exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = test_context.resource_metrics.lock().unwrap(); println!("resource_metrics: {resource_metrics:?}"); assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect."); } @@ -1372,80 +1396,85 @@ mod tests { fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) { match instrument_name { "counter" => { - let MetricData::Sum(sum) = - test_context.get_aggregation::("test_counter", None) - else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 2); - let zero_attribute_datapoint = - find_sum_datapoint_with_no_attributes(&sum.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 5); - let data_point1 = - find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 10); + test_context.with_aggregation::("test_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 2); + let zero_attribute_datapoint = + find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 5); + let data_point1 = + find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 10); + }); } "updown_counter" => { - let MetricData::Sum(sum) = - test_context.get_aggregation::("test_updowncounter", None) - else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 2); - let zero_attribute_datapoint = - find_sum_datapoint_with_no_attributes(&sum.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 15); - let data_point1 = - find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 20); + test_context.with_aggregation::("test_updowncounter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 2); + let zero_attribute_datapoint = + find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 15); + let data_point1 = + find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 20); + }); } "histogram" => { - let MetricData::Histogram(histogram_data) = - test_context.get_aggregation::("test_histogram", None) - else { - unreachable!() - }; - assert_eq!(histogram_data.data_points.len(), 2); - let zero_attribute_datapoint = - find_histogram_datapoint_with_no_attributes(&histogram_data.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.count, 1); - assert_eq!(zero_attribute_datapoint.sum, 25); - assert_eq!(zero_attribute_datapoint.min, Some(25)); - assert_eq!(zero_attribute_datapoint.max, Some(25)); - let data_point1 = find_histogram_datapoint_with_key_value( - &histogram_data.data_points, - "key1", - "value1", - ) - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.count, 1); - assert_eq!(data_point1.sum, 30); - assert_eq!(data_point1.min, Some(30)); - assert_eq!(data_point1.max, Some(30)); + test_context.with_aggregation::("test_histogram", None, |metric| { + let MetricData::Histogram(histogram_data) = metric else { + unreachable!() + }; + + assert_eq!(histogram_data.data_points.len(), 2); + let zero_attribute_datapoint = find_histogram_datapoint_with_no_attributes( + &histogram_data.data_points, + ) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.count, 1); + assert_eq!(zero_attribute_datapoint.sum, 25); + assert_eq!(zero_attribute_datapoint.min, Some(25)); + assert_eq!(zero_attribute_datapoint.max, Some(25)); + let data_point1 = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.count, 1); + assert_eq!(data_point1.sum, 30); + assert_eq!(data_point1.min, Some(30)); + assert_eq!(data_point1.max, Some(30)); + }); } "gauge" => { - let MetricData::Gauge(gauge_data) = - test_context.get_aggregation::("test_gauge", None) - else { - unreachable!() - }; - assert_eq!(gauge_data.data_points.len(), 2); - let zero_attribute_datapoint = - find_gauge_datapoint_with_no_attributes(&gauge_data.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 35); - let data_point1 = find_gauge_datapoint_with_key_value( - &gauge_data.data_points, - "key1", - "value1", - ) - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 40); + test_context.with_aggregation::("test_gauge", None, |metric| { + let MetricData::Gauge(gauge_data) = metric else { + unreachable!() + }; + + assert_eq!(gauge_data.data_points.len(), 2); + let zero_attribute_datapoint = + find_gauge_datapoint_with_no_attributes(&gauge_data.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 35); + let data_point1 = find_gauge_datapoint_with_key_value( + &gauge_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 40); + }); } _ => panic!("Incorrect instrument kind provided"), } @@ -1640,9 +1669,12 @@ mod tests { // cargo test view_test_* --all-features -- --nocapture // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view_function) .build(); @@ -1658,9 +1690,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); assert_eq!(resource_metrics.len(), 1); assert_eq!(resource_metrics[0].scope_metrics.len(), 1); let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; @@ -1710,9 +1740,12 @@ mod tests { }; // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view1) .with_view(view2) .build(); @@ -1725,9 +1758,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); assert_eq!(resource_metrics.len(), 1); assert_eq!(resource_metrics[0].scope_metrics.len(), 1); let metrics = &resource_metrics[0].scope_metrics[0].metrics; @@ -1753,9 +1784,12 @@ mod tests { }; // Arrange - let exporter = InMemoryMetricExporter::default(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view) .build(); @@ -1769,9 +1803,7 @@ mod tests { meter_provider.force_flush().unwrap(); // Assert - let resource_metrics = exporter - .get_finished_metrics() - .expect("metrics are expected to be exported."); + let resource_metrics = metrics.lock().unwrap(); assert_eq!(resource_metrics.len(), 1); assert_eq!(resource_metrics[0].scope_metrics.len(), 1); let metrics = &resource_metrics[0].scope_metrics[0].metrics; @@ -1854,57 +1886,60 @@ mod tests { fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) { match instrument_name { "counter" => { - let MetricData::Sum(sum) = - test_context.get_aggregation::("test_counter", None) - else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 2); - assert!(sum.is_monotonic); - let zero_attribute_datapoint = - find_sum_datapoint_with_no_attributes(&sum.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 5); - let data_point1 = - find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 10); + test_context.with_aggregation::("test_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 2); + assert!(sum.is_monotonic); + let zero_attribute_datapoint = + find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 5); + let data_point1 = + find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 10); + }); } "updown_counter" => { - let MetricData::Sum(sum) = - test_context.get_aggregation::("test_updowncounter", None) - else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 2); - assert!(!sum.is_monotonic); - let zero_attribute_datapoint = - find_sum_datapoint_with_no_attributes(&sum.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 15); - let data_point1 = - find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 20); + test_context.with_aggregation::("test_updowncounter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; + + assert_eq!(sum.data_points.len(), 2); + assert!(!sum.is_monotonic); + let zero_attribute_datapoint = + find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 15); + let data_point1 = + find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 20); + }); } "gauge" => { - let MetricData::Gauge(gauge_data) = - test_context.get_aggregation::("test_gauge", None) - else { - unreachable!() - }; - assert_eq!(gauge_data.data_points.len(), 2); - let zero_attribute_datapoint = - find_gauge_datapoint_with_no_attributes(&gauge_data.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 25); - let data_point1 = find_gauge_datapoint_with_key_value( - &gauge_data.data_points, - "key1", - "value1", - ) - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 30); + test_context.with_aggregation::("test_gauge", None, |metric| { + let MetricData::Gauge(gauge_data) = metric else { + unreachable!() + }; + + assert_eq!(gauge_data.data_points.len(), 2); + let zero_attribute_datapoint = + find_gauge_datapoint_with_no_attributes(&gauge_data.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 25); + let data_point1 = find_gauge_datapoint_with_key_value( + &gauge_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 30); + }); } _ => panic!("Incorrect instrument kind provided"), } @@ -1913,7 +1948,8 @@ mod tests { fn counter_multithreaded_aggregation_helper(temporality: Temporality) { // Arrange - let mut test_context = TestContext::new(temporality); + let test_context = TestContext::new(temporality); + let counter = Arc::new(test_context.u64_counter("test", "my_counter", None)); for i in 0..10 { @@ -1941,41 +1977,42 @@ mod tests { // Assert // We invoke `test_context.flush_metrics()` six times. - let sums = test_context - .get_from_multiple_aggregations::("my_counter", None, 6) - .into_iter() - .map(|data| { - if let MetricData::Sum(sum) = data { - sum - } else { - unreachable!() - } - }) - .collect::>(); + test_context.with_multiple_aggregations::("my_counter", None, 6, |metrics| { + let sums = metrics + .into_iter() + .map(|data| { + if let MetricData::Sum(sum) = data { + sum + } else { + unreachable!() + } + }) + .collect::>(); - let mut sum_zero_attributes = 0; - let mut sum_key1_value1 = 0; - sums.iter().for_each(|sum| { - assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series. - assert!(sum.is_monotonic, "Counter should produce monotonic."); - assert_eq!(sum.temporality, temporality); + let mut sum_zero_attributes = 0; + let mut sum_key1_value1 = 0; + sums.iter().for_each(|sum| { + assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series. + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); - if temporality == Temporality::Delta { - sum_zero_attributes += sum.data_points[0].value; - sum_key1_value1 += sum.data_points[1].value; - } else { - sum_zero_attributes = sum.data_points[0].value; - sum_key1_value1 = sum.data_points[1].value; - }; - }); + if temporality == Temporality::Delta { + sum_zero_attributes += sum.data_points[0].value; + sum_key1_value1 += sum.data_points[1].value; + } else { + sum_zero_attributes = sum.data_points[0].value; + sum_key1_value1 = sum.data_points[1].value; + }; + }); - assert_eq!(sum_zero_attributes, 10); - assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5. + assert_eq!(sum_zero_attributes, 10); + assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5. + }); } fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) { // Arrange - let mut test_context = TestContext::new(temporality); + let test_context = TestContext::new(temporality); let counter = Arc::new(test_context.meter().f64_counter("test_counter").build()); for i in 0..10 { @@ -2003,41 +2040,42 @@ mod tests { // Assert // We invoke `test_context.flush_metrics()` six times. - let sums = test_context - .get_from_multiple_aggregations::("test_counter", None, 6) - .into_iter() - .map(|data| { - if let MetricData::Sum(sum) = data { - sum - } else { - unreachable!() - } - }) - .collect::>(); + test_context.with_multiple_aggregations::("test_counter", None, 6, |metrics| { + let sums = metrics + .into_iter() + .map(|data| { + if let MetricData::Sum(sum) = data { + sum + } else { + unreachable!() + } + }) + .collect::>(); - let mut sum_zero_attributes = 0.0; - let mut sum_key1_value1 = 0.0; - sums.iter().for_each(|sum| { - assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series. - assert!(sum.is_monotonic, "Counter should produce monotonic."); - assert_eq!(sum.temporality, temporality); + let mut sum_zero_attributes = 0.0; + let mut sum_key1_value1 = 0.0; + sums.iter().for_each(|sum| { + assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series. + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); - if temporality == Temporality::Delta { - sum_zero_attributes += sum.data_points[0].value; - sum_key1_value1 += sum.data_points[1].value; - } else { - sum_zero_attributes = sum.data_points[0].value; - sum_key1_value1 = sum.data_points[1].value; - }; - }); + if temporality == Temporality::Delta { + sum_zero_attributes += sum.data_points[0].value; + sum_key1_value1 += sum.data_points[1].value; + } else { + sum_zero_attributes = sum.data_points[0].value; + sum_key1_value1 = sum.data_points[1].value; + }; + }); - assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001); - assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5 + assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001); + assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5 + }); } fn histogram_multithreaded_aggregation_helper(temporality: Temporality) { // Arrange - let mut test_context = TestContext::new(temporality); + let test_context = TestContext::new(temporality); let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build()); for i in 0..10 { @@ -2066,121 +2104,131 @@ mod tests { // Assert // We invoke `test_context.flush_metrics()` six times. - let histograms = test_context - .get_from_multiple_aggregations::("test_histogram", None, 6) - .into_iter() - .map(|data| { - if let MetricData::Histogram(hist) = data { - hist - } else { - unreachable!() - } - }) - .collect::>(); - - let ( - mut sum_zero_attributes, - mut count_zero_attributes, - mut min_zero_attributes, - mut max_zero_attributes, - ) = (0, 0, u64::MAX, u64::MIN); - let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) = - (0, 0, u64::MAX, u64::MIN); - - let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration - let mut bucket_counts_key1_value1 = vec![0; 16]; - - histograms.iter().for_each(|histogram| { - assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series. - assert_eq!(histogram.temporality, temporality); - - let data_point_zero_attributes = - find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap(); - let data_point_key1_value1 = - find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1") - .unwrap(); + test_context.with_multiple_aggregations::("test_histogram", None, 6, |metrics| { + let histograms = metrics + .into_iter() + .map(|data| { + if let MetricData::Histogram(hist) = data { + hist + } else { + unreachable!() + } + }) + .collect::>(); - if temporality == Temporality::Delta { - sum_zero_attributes += data_point_zero_attributes.sum; - sum_key1_value1 += data_point_key1_value1.sum; + let ( + mut sum_zero_attributes, + mut count_zero_attributes, + mut min_zero_attributes, + mut max_zero_attributes, + ) = (0, 0, u64::MAX, u64::MIN); + let ( + mut sum_key1_value1, + mut count_key1_value1, + mut min_key1_value1, + mut max_key1_value1, + ) = (0, 0, u64::MAX, u64::MIN); + + let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration + let mut bucket_counts_key1_value1 = vec![0; 16]; + + histograms.iter().for_each(|histogram| { + assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series. + assert_eq!(histogram.temporality, temporality); + + let data_point_zero_attributes = + find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap(); + let data_point_key1_value1 = find_histogram_datapoint_with_key_value( + &histogram.data_points, + "key1", + "value1", + ) + .unwrap(); - count_zero_attributes += data_point_zero_attributes.count; - count_key1_value1 += data_point_key1_value1.count; + if temporality == Temporality::Delta { + sum_zero_attributes += data_point_zero_attributes.sum; + sum_key1_value1 += data_point_key1_value1.sum; - min_zero_attributes = - min(min_zero_attributes, data_point_zero_attributes.min.unwrap()); - min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap()); + count_zero_attributes += data_point_zero_attributes.count; + count_key1_value1 += data_point_key1_value1.count; - max_zero_attributes = - max(max_zero_attributes, data_point_zero_attributes.max.unwrap()); - max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap()); + min_zero_attributes = + min(min_zero_attributes, data_point_zero_attributes.min.unwrap()); + min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap()); - assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); - assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + max_zero_attributes = + max(max_zero_attributes, data_point_zero_attributes.max.unwrap()); + max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap()); - for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() { - bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i]; - } + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); - for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() { - bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i]; - } - } else { - sum_zero_attributes = data_point_zero_attributes.sum; - sum_key1_value1 = data_point_key1_value1.sum; + for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() { + bucket_counts_zero_attributes[i] += + data_point_zero_attributes.bucket_counts[i]; + } - count_zero_attributes = data_point_zero_attributes.count; - count_key1_value1 = data_point_key1_value1.count; + for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() { + bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i]; + } + } else { + sum_zero_attributes = data_point_zero_attributes.sum; + sum_key1_value1 = data_point_key1_value1.sum; - min_zero_attributes = data_point_zero_attributes.min.unwrap(); - min_key1_value1 = data_point_key1_value1.min.unwrap(); + count_zero_attributes = data_point_zero_attributes.count; + count_key1_value1 = data_point_key1_value1.count; - max_zero_attributes = data_point_zero_attributes.max.unwrap(); - max_key1_value1 = data_point_key1_value1.max.unwrap(); + min_zero_attributes = data_point_zero_attributes.min.unwrap(); + min_key1_value1 = data_point_key1_value1.min.unwrap(); - assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); - assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + max_zero_attributes = data_point_zero_attributes.max.unwrap(); + max_key1_value1 = data_point_key1_value1.max.unwrap(); - bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts); - bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts); - }; - }); + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); - // Default buckets: - // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], - // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞). + bucket_counts_zero_attributes + .clone_from(&data_point_zero_attributes.bucket_counts); + bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts); + }; + }); + + // Default buckets: + // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], + // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞). - assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements. - assert_eq!(sum_zero_attributes, 50); // Each of the 10 update threads record measurements summing up to 5. - assert_eq!(min_zero_attributes, 1); - assert_eq!(max_zero_attributes, 4); + assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements. + assert_eq!(sum_zero_attributes, 50); // Each of the 10 update threads record measurements summing up to 5. + assert_eq!(min_zero_attributes, 1); + assert_eq!(max_zero_attributes, 4); - for (i, count) in bucket_counts_zero_attributes.iter().enumerate() { - match i { - 1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1 and 4 fall under the bucket (0, 5]. - _ => assert_eq!(*count, 0), + for (i, count) in bucket_counts_zero_attributes.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1 and 4 fall under the bucket (0, 5]. + _ => assert_eq!(*count, 0), + } } - } - assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements. - assert_eq!(sum_key1_value1, 1000); // Each of the 10 update threads record measurements summing up to 100 (5 + 7 + 18 + 35 + 35). - assert_eq!(min_key1_value1, 5); - assert_eq!(max_key1_value1, 35); - - for (i, count) in bucket_counts_key1_value1.iter().enumerate() { - match i { - 1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5 falls under the bucket (0, 5]. - 2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7 falls under the bucket (5, 10]. - 3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18 falls under the bucket (10, 25]. - 4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35 (recorded twice) falls under the bucket (25, 50]. - _ => assert_eq!(*count, 0), + assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements. + assert_eq!(sum_key1_value1, 1000); // Each of the 10 update threads record measurements summing up to 100 (5 + 7 + 18 + 35 + 35). + assert_eq!(min_key1_value1, 5); + assert_eq!(max_key1_value1, 35); + + for (i, count) in bucket_counts_key1_value1.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5 falls under the bucket (0, 5]. + 2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7 falls under the bucket (5, 10]. + 3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18 falls under the bucket (10, 25]. + 4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35 (recorded twice) falls under the bucket (25, 50]. + _ => assert_eq!(*count, 0), + } } - } + }); } fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) { // Arrange - let mut test_context = TestContext::new(temporality); + let test_context = TestContext::new(temporality); let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build()); for i in 0..10 { @@ -2209,116 +2257,126 @@ mod tests { // Assert // We invoke `test_context.flush_metrics()` six times. - let histograms = test_context - .get_from_multiple_aggregations::("test_histogram", None, 6) - .into_iter() - .map(|data| { - if let MetricData::Histogram(hist) = data { - hist - } else { - unreachable!() - } - }) - .collect::>(); - - let ( - mut sum_zero_attributes, - mut count_zero_attributes, - mut min_zero_attributes, - mut max_zero_attributes, - ) = (0.0, 0, f64::MAX, f64::MIN); - let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) = - (0.0, 0, f64::MAX, f64::MIN); - - let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration - let mut bucket_counts_key1_value1 = vec![0; 16]; - - histograms.iter().for_each(|histogram| { - assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series. - assert_eq!(histogram.temporality, temporality); - - let data_point_zero_attributes = - find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap(); - let data_point_key1_value1 = - find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1") - .unwrap(); + test_context.with_multiple_aggregations::("test_histogram", None, 6, |metrics| { + let histograms = metrics + .into_iter() + .map(|data| { + if let MetricData::Histogram(hist) = data { + hist + } else { + unreachable!() + } + }) + .collect::>(); - if temporality == Temporality::Delta { - sum_zero_attributes += data_point_zero_attributes.sum; - sum_key1_value1 += data_point_key1_value1.sum; + let ( + mut sum_zero_attributes, + mut count_zero_attributes, + mut min_zero_attributes, + mut max_zero_attributes, + ) = (0.0, 0, f64::MAX, f64::MIN); + let ( + mut sum_key1_value1, + mut count_key1_value1, + mut min_key1_value1, + mut max_key1_value1, + ) = (0.0, 0, f64::MAX, f64::MIN); + + let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration + let mut bucket_counts_key1_value1 = vec![0; 16]; + + histograms.iter().for_each(|histogram| { + assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series. + assert_eq!(histogram.temporality, temporality); + + let data_point_zero_attributes = + find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap(); + let data_point_key1_value1 = find_histogram_datapoint_with_key_value( + &histogram.data_points, + "key1", + "value1", + ) + .unwrap(); - count_zero_attributes += data_point_zero_attributes.count; - count_key1_value1 += data_point_key1_value1.count; + if temporality == Temporality::Delta { + sum_zero_attributes += data_point_zero_attributes.sum; + sum_key1_value1 += data_point_key1_value1.sum; - min_zero_attributes = - min_zero_attributes.min(data_point_zero_attributes.min.unwrap()); - min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap()); + count_zero_attributes += data_point_zero_attributes.count; + count_key1_value1 += data_point_key1_value1.count; - max_zero_attributes = - max_zero_attributes.max(data_point_zero_attributes.max.unwrap()); - max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap()); + min_zero_attributes = + min_zero_attributes.min(data_point_zero_attributes.min.unwrap()); + min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap()); - assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); - assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + max_zero_attributes = + max_zero_attributes.max(data_point_zero_attributes.max.unwrap()); + max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap()); - for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() { - bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i]; - } + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); - for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() { - bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i]; - } - } else { - sum_zero_attributes = data_point_zero_attributes.sum; - sum_key1_value1 = data_point_key1_value1.sum; + for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() { + bucket_counts_zero_attributes[i] += + data_point_zero_attributes.bucket_counts[i]; + } + + for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() { + bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i]; + } + } else { + sum_zero_attributes = data_point_zero_attributes.sum; + sum_key1_value1 = data_point_key1_value1.sum; - count_zero_attributes = data_point_zero_attributes.count; - count_key1_value1 = data_point_key1_value1.count; + count_zero_attributes = data_point_zero_attributes.count; + count_key1_value1 = data_point_key1_value1.count; - min_zero_attributes = data_point_zero_attributes.min.unwrap(); - min_key1_value1 = data_point_key1_value1.min.unwrap(); + min_zero_attributes = data_point_zero_attributes.min.unwrap(); + min_key1_value1 = data_point_key1_value1.min.unwrap(); - max_zero_attributes = data_point_zero_attributes.max.unwrap(); - max_key1_value1 = data_point_key1_value1.max.unwrap(); + max_zero_attributes = data_point_zero_attributes.max.unwrap(); + max_key1_value1 = data_point_key1_value1.max.unwrap(); - assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); - assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); - bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts); - bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts); - }; - }); + bucket_counts_zero_attributes + .clone_from(&data_point_zero_attributes.bucket_counts); + bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts); + }; + }); - // Default buckets: - // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], - // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞). + // Default buckets: + // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], + // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞). - assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements. - assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6) - assert_eq!(min_zero_attributes, 1.5); - assert_eq!(max_zero_attributes, 4.6); + assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements. + assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6) + assert_eq!(min_zero_attributes, 1.5); + assert_eq!(max_zero_attributes, 4.6); - for (i, count) in bucket_counts_zero_attributes.iter().enumerate() { - match i { - 1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0]. - _ => assert_eq!(*count, 0), + for (i, count) in bucket_counts_zero_attributes.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0]. + _ => assert_eq!(*count, 0), + } } - } - assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements. - assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1). - assert_eq!(min_key1_value1, 5.0); - assert_eq!(max_key1_value1, 35.1); - - for (i, count) in bucket_counts_key1_value1.iter().enumerate() { - match i { - 1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0]. - 2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0]. - 3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0]. - 4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0]. - _ => assert_eq!(*count, 0), + assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements. + assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1). + assert_eq!(min_key1_value1, 5.0); + assert_eq!(max_key1_value1, 35.1); + + for (i, count) in bucket_counts_key1_value1.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0]. + 2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0]. + 3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0]. + 4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0]. + _ => assert_eq!(*count, 0), + } } - } + }); } fn histogram_aggregation_helper(temporality: Temporality) { @@ -2345,43 +2403,50 @@ mod tests { test_context.flush_metrics(); // Assert - let MetricData::Histogram(histogram_data) = - test_context.get_aggregation::("my_histogram", None) - else { - unreachable!() - }; - // Expecting 2 time-series. - assert_eq!(histogram_data.data_points.len(), 2); - if let Temporality::Cumulative = temporality { - assert_eq!( - histogram_data.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!( - histogram_data.temporality, - Temporality::Delta, - "Should produce delta" - ); - } + test_context.with_aggregation::("my_histogram", None, |metric| { + let MetricData::Histogram(histogram_data) = metric else { + unreachable!() + }; - // find and validate key1=value2 datapoint - let data_point1 = - find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.count, values_kv1.len() as u64); - assert_eq!(data_point1.sum, values_kv1.iter().sum::()); - assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap()); - assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap()); + // Expecting 2 time-series. + assert_eq!(histogram_data.data_points.len(), 2); + if let Temporality::Cumulative = temporality { + assert_eq!( + histogram_data.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!( + histogram_data.temporality, + Temporality::Delta, + "Should produce delta" + ); + } - let data_point2 = - find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point2.count, values_kv2.len() as u64); - assert_eq!(data_point2.sum, values_kv2.iter().sum::()); - assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap()); - assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap()); + // find and validate key1=value2 datapoint + let data_point1 = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.count, values_kv1.len() as u64); + assert_eq!(data_point1.sum, values_kv1.iter().sum::()); + assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap()); + assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap()); + + let data_point2 = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value2", + ) + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point2.count, values_kv2.len() as u64); + assert_eq!(data_point2.sum, values_kv2.iter().sum::()); + assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap()); + assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap()); + }); // Reset and report more measurements test_context.reset_metrics(); @@ -2395,41 +2460,48 @@ mod tests { test_context.flush_metrics(); - let MetricData::Histogram(histogram_data) = - test_context.get_aggregation::("my_histogram", None) - else { - unreachable!() - }; - assert_eq!(histogram_data.data_points.len(), 2); - let data_point1 = - find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - if temporality == Temporality::Cumulative { - assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64)); - assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::())); - assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap()); - assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap()); - } else { - assert_eq!(data_point1.count, values_kv1.len() as u64); - assert_eq!(data_point1.sum, values_kv1.iter().sum::()); - assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap()); - assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap()); - } + test_context.with_aggregation::("my_histogram", None, |metric| { + let MetricData::Histogram(histogram_data) = metric else { + unreachable!() + }; - let data_point1 = - find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2") - .expect("datapoint with key1=value1 expected"); - if temporality == Temporality::Cumulative { - assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64)); - assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::())); - assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap()); - assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap()); - } else { - assert_eq!(data_point1.count, values_kv2.len() as u64); - assert_eq!(data_point1.sum, values_kv2.iter().sum::()); - assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap()); - assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap()); - } + assert_eq!(histogram_data.data_points.len(), 2); + let data_point1 = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + if temporality == Temporality::Cumulative { + assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64)); + assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::())); + assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap()); + assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap()); + } else { + assert_eq!(data_point1.count, values_kv1.len() as u64); + assert_eq!(data_point1.sum, values_kv1.iter().sum::()); + assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap()); + assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap()); + } + + let data_point1 = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value2", + ) + .expect("datapoint with key1=value1 expected"); + if temporality == Temporality::Cumulative { + assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64)); + assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::())); + assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap()); + assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap()); + } else { + assert_eq!(data_point1.count, values_kv2.len() as u64); + assert_eq!(data_point1.sum, values_kv2.iter().sum::()); + assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap()); + assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap()); + } + }); } fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) { @@ -2448,43 +2520,48 @@ mod tests { test_context.flush_metrics(); // Assert - let MetricData::Histogram(histogram_data) = - test_context.get_aggregation::("test_histogram", None) - else { - unreachable!() - }; - // Expecting 2 time-series. - assert_eq!(histogram_data.data_points.len(), 1); - if let Temporality::Cumulative = temporality { - assert_eq!( - histogram_data.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!( - histogram_data.temporality, - Temporality::Delta, - "Should produce delta" - ); - } - // find and validate key1=value1 datapoint - let data_point = - find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); + test_context.with_aggregation::("test_histogram", None, |metric| { + let MetricData::Histogram(histogram_data) = metric else { + unreachable!() + }; - assert_eq!(data_point.count, 5); - assert_eq!(data_point.sum, 15); + // Expecting 2 time-series. + assert_eq!(histogram_data.data_points.len(), 1); + if let Temporality::Cumulative = temporality { + assert_eq!( + histogram_data.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!( + histogram_data.temporality, + Temporality::Delta, + "Should produce delta" + ); + } + + // find and validate key1=value1 datapoint + let data_point = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); - // Check the bucket counts - // -∞ to 1.0: 1 - // 1.0 to 2.5: 1 - // 2.5 to 5.5: 3 - // 5.5 to +∞: 0 + assert_eq!(data_point.count, 5); + assert_eq!(data_point.sum, 15); - assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds); - assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts); + // Check the bucket counts + // -∞ to 1.0: 1 + // 1.0 to 2.5: 1 + // 2.5 to 5.5: 3 + // 5.5 to +∞: 0 + + assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds); + assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts); + }); } fn histogram_aggregation_with_empty_bounds_helper(temporality: Temporality) { @@ -2503,36 +2580,41 @@ mod tests { test_context.flush_metrics(); // Assert - let MetricData::Histogram(histogram_data) = - test_context.get_aggregation::("test_histogram", None) - else { - unreachable!() - }; - // Expecting 1 time-series. - assert_eq!(histogram_data.data_points.len(), 1); - if let Temporality::Cumulative = temporality { - assert_eq!( - histogram_data.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!( - histogram_data.temporality, - Temporality::Delta, - "Should produce delta" - ); - } - // find and validate key1=value1 datapoint - let data_point = - find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); + test_context.with_aggregation::("test_histogram", None, |metric| { + let MetricData::Histogram(histogram_data) = metric else { + unreachable!() + }; - assert_eq!(data_point.count, 5); - assert_eq!(data_point.sum, 15); - assert!(data_point.bounds.is_empty()); - assert!(data_point.bucket_counts.is_empty()); + // Expecting 1 time-series. + assert_eq!(histogram_data.data_points.len(), 1); + if let Temporality::Cumulative = temporality { + assert_eq!( + histogram_data.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!( + histogram_data.temporality, + Temporality::Delta, + "Should produce delta" + ); + } + + // find and validate key1=value1 datapoint + let data_point = find_histogram_datapoint_with_key_value( + &histogram_data.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + + assert_eq!(data_point.count, 5); + assert_eq!(data_point.sum, 15); + assert!(data_point.bounds.is_empty()); + assert!(data_point.bucket_counts.is_empty()); + }); } fn gauge_aggregation_helper(temporality: Temporality) { @@ -2554,24 +2636,32 @@ mod tests { test_context.flush_metrics(); // Assert - let MetricData::Gauge(gauge_data_point) = - test_context.get_aggregation::("my_gauge", None) - else { - unreachable!() - }; - // Expecting 2 time-series. - assert_eq!(gauge_data_point.data_points.len(), 2); - // find and validate key1=value2 datapoint - let data_point1 = - find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 4); + test_context.with_aggregation::("my_gauge", None, |metric| { + let MetricData::Gauge(gauge_data_point) = metric else { + unreachable!() + }; - let data_point1 = - find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 6); + // Expecting 2 time-series. + assert_eq!(gauge_data_point.data_points.len(), 2); + + // find and validate key1=value2 datapoint + let data_point1 = find_gauge_datapoint_with_key_value( + &gauge_data_point.data_points, + "key1", + "value1", + ) + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 4); + + let data_point1 = find_gauge_datapoint_with_key_value( + &gauge_data_point.data_points, + "key1", + "value2", + ) + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point1.value, 6); + }); // Reset and report more measurements test_context.reset_metrics(); @@ -2587,17 +2677,22 @@ mod tests { test_context.flush_metrics(); - let MetricData::Gauge(gauge) = test_context.get_aggregation::("my_gauge", None) else { - unreachable!() - }; - assert_eq!(gauge.data_points.len(), 2); - let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 41); + test_context.with_aggregation::("my_gauge", None, |metric| { + let MetricData::Gauge(gauge) = metric else { + unreachable!() + }; - let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 54); + assert_eq!(gauge.data_points.len(), 2); + let data_point1 = + find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 41); + + let data_point1 = + find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2") + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point1.value, 54); + }); } fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) { @@ -2617,60 +2712,67 @@ mod tests { test_context.flush_metrics(); - // Assert - let MetricData::Gauge(gauge) = - test_context.get_aggregation::("test_observable_gauge", None) - else { - unreachable!() - }; - // Expecting 2 time-series. let expected_time_series_count = if use_empty_attributes { 3 } else { 2 }; - assert_eq!(gauge.data_points.len(), expected_time_series_count); - - if use_empty_attributes { - // find and validate zero attribute datapoint - let zero_attribute_datapoint = - find_gauge_datapoint_with_no_attributes(&gauge.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 1); - } - // find and validate key1=value1 datapoint - let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 4); + // Assert + test_context.with_aggregation::("test_observable_gauge", None, |metric| { + let MetricData::Gauge(gauge) = metric else { + unreachable!() + }; + + // Expecting 2 time-series. + assert_eq!(gauge.data_points.len(), expected_time_series_count); - // find and validate key2=value2 datapoint - let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2") - .expect("datapoint with key2=value2 expected"); - assert_eq!(data_point2.value, 5); + if use_empty_attributes { + // find and validate zero attribute datapoint + let zero_attribute_datapoint = + find_gauge_datapoint_with_no_attributes(&gauge.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 1); + } + + // find and validate key1=value1 datapoint + let data_point1 = + find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 4); + + // find and validate key2=value2 datapoint + let data_point2 = + find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2") + .expect("datapoint with key2=value2 expected"); + assert_eq!(data_point2.value, 5); + }); // Reset and report more measurements test_context.reset_metrics(); test_context.flush_metrics(); - let MetricData::Gauge(gauge) = - test_context.get_aggregation::("test_observable_gauge", None) - else { - unreachable!() - }; - assert_eq!(gauge.data_points.len(), expected_time_series_count); + test_context.with_aggregation::("test_observable_gauge", None, |metric| { + let MetricData::Gauge(gauge) = metric else { + unreachable!() + }; - if use_empty_attributes { - let zero_attribute_datapoint = - find_gauge_datapoint_with_no_attributes(&gauge.data_points) - .expect("datapoint with no attributes expected"); - assert_eq!(zero_attribute_datapoint.value, 1); - } + assert_eq!(gauge.data_points.len(), expected_time_series_count); - let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 4); + if use_empty_attributes { + let zero_attribute_datapoint = + find_gauge_datapoint_with_no_attributes(&gauge.data_points) + .expect("datapoint with no attributes expected"); + assert_eq!(zero_attribute_datapoint.value, 1); + } + + let data_point1 = + find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 4); - let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2") - .expect("datapoint with key2=value2 expected"); - assert_eq!(data_point2.value, 5); + let data_point2 = + find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2") + .expect("datapoint with key2=value2 expected"); + assert_eq!(data_point2.value, 5); + }); } fn counter_aggregation_helper(temporality: Temporality) { @@ -2692,30 +2794,33 @@ mod tests { test_context.flush_metrics(); // Assert - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; - // Expecting 2 time-series. - assert_eq!(sum.data_points.len(), 2); - assert!(sum.is_monotonic, "Counter should produce monotonic."); - if let Temporality::Cumulative = temporality { - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - } + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - // find and validate key1=value2 datapoint - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 5); + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 2); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + } - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 3); + // find and validate key1=value2 datapoint + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 5); + + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point1.value, 3); + }); // Reset and report more measurements test_context.reset_metrics(); @@ -2731,25 +2836,28 @@ mod tests { test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 2); - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - if temporality == Temporality::Cumulative { - assert_eq!(data_point1.value, 10); - } else { - assert_eq!(data_point1.value, 5); - } + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - if temporality == Temporality::Cumulative { - assert_eq!(data_point1.value, 6); - } else { - assert_eq!(data_point1.value, 3); - } + assert_eq!(sum.data_points.len(), 2); + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + if temporality == Temporality::Cumulative { + assert_eq!(data_point1.value, 10); + } else { + assert_eq!(data_point1.value, 5); + } + + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") + .expect("datapoint with key1=value2 expected"); + if temporality == Temporality::Cumulative { + assert_eq!(data_point1.value, 6); + } else { + assert_eq!(data_point1.value, 3); + } + }); } fn counter_aggregation_overflow_helper(temporality: Temporality) { @@ -2773,28 +2881,30 @@ mod tests { counter.add(100, &[KeyValue::new("A", "yet_another")]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes) - assert_eq!(sum.data_points.len(), 2002); + // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes) + assert_eq!(sum.data_points.len(), 2002); - let data_point = - find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); - assert_eq!(data_point.value, 300); + let data_point = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!(data_point.value, 300); - // let empty_attrs_data_point = &sum.data_points[0]; - let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points) - .expect("Empty attributes point expected"); - assert!( - empty_attrs_data_point.attributes.is_empty(), - "Non-empty attribute set" - ); - assert_eq!( - empty_attrs_data_point.value, 6, - "Empty attributes value should be 3+3=6" - ); + // let empty_attrs_data_point = &sum.data_points[0]; + let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("Empty attributes point expected"); + assert!( + empty_attrs_data_point.attributes.is_empty(), + "Non-empty attribute set" + ); + assert_eq!( + empty_attrs_data_point.value, 6, + "Empty attributes value should be 3+3=6" + ); + }); // Phase 2 - for delta temporality, after each collect, data points are cleared // but for cumulative, they are not cleared. @@ -2806,42 +2916,46 @@ mod tests { counter.add(100, &[KeyValue::new("A", "yet_another")]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; - - if temporality == Temporality::Delta { - assert_eq!(sum.data_points.len(), 3); - - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") - .expect("point expected"); - assert_eq!(data_point.value, 100); + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") - .expect("point expected"); - assert_eq!(data_point.value, 100); + if temporality == Temporality::Delta { + assert_eq!(sum.data_points.len(), 3); - let data_point = - find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") .expect("point expected"); - assert_eq!(data_point.value, 100); - } else { - // For cumulative, overflow should still be there, and new points should not be added. - assert_eq!(sum.data_points.len(), 2002); - let data_point = - find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); - assert_eq!(data_point.value, 600); - - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); - assert!(data_point.is_none(), "point should not be present"); + assert_eq!(data_point.value, 100); - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); - assert!(data_point.is_none(), "point should not be present"); + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") + .expect("point expected"); + assert_eq!(data_point.value, 100); - let data_point = - find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); - assert!(data_point.is_none(), "point should not be present"); - } + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + } else { + // For cumulative, overflow should still be there, and new points should not be added. + assert_eq!(sum.data_points.len(), 2002); + let data_point = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!(data_point.value, 600); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); + assert!(data_point.is_none(), "point should not be present"); + } + }); } fn counter_aggregation_overflow_helper_custom_limit(temporality: Temporality) { @@ -2879,28 +2993,30 @@ mod tests { counter.add(100, &[KeyValue::new("A", "yet_another")]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - // Expecting (cardinality_limit + 1 overflow + Empty attributes) data points. - assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); + // Expecting (cardinality_limit + 1 overflow + Empty attributes) data points. + assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); - let data_point = - find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); - assert_eq!(data_point.value, 300); + let data_point = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!(data_point.value, 300); - // let empty_attrs_data_point = &sum.data_points[0]; - let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points) - .expect("Empty attributes point expected"); - assert!( - empty_attrs_data_point.attributes.is_empty(), - "Non-empty attribute set" - ); - assert_eq!( - empty_attrs_data_point.value, 6, - "Empty attributes value should be 3+3=6" - ); + // let empty_attrs_data_point = &sum.data_points[0]; + let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points) + .expect("Empty attributes point expected"); + assert!( + empty_attrs_data_point.attributes.is_empty(), + "Non-empty attribute set" + ); + assert_eq!( + empty_attrs_data_point.value, 6, + "Empty attributes value should be 3+3=6" + ); + }); // Phase 2 - for delta temporality, after each collect, data points are cleared // but for cumulative, they are not cleared. @@ -2912,42 +3028,46 @@ mod tests { counter.add(100, &[KeyValue::new("A", "yet_another")]); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; - - if temporality == Temporality::Delta { - assert_eq!(sum.data_points.len(), 3); - - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") - .expect("point expected"); - assert_eq!(data_point.value, 100); + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") - .expect("point expected"); - assert_eq!(data_point.value, 100); + if temporality == Temporality::Delta { + assert_eq!(sum.data_points.len(), 3); - let data_point = - find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo") .expect("point expected"); - assert_eq!(data_point.value, 100); - } else { - // For cumulative, overflow should still be there, and new points should not be added. - assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); - let data_point = - find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); - assert_eq!(data_point.value, 600); - - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); - assert!(data_point.is_none(), "point should not be present"); + assert_eq!(data_point.value, 100); - let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); - assert!(data_point.is_none(), "point should not be present"); + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "another") + .expect("point expected"); + assert_eq!(data_point.value, 100); - let data_point = - find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); - assert!(data_point.is_none(), "point should not be present"); - } + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another") + .expect("point expected"); + assert_eq!(data_point.value, 100); + } else { + // For cumulative, overflow should still be there, and new points should not be added. + assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1); + let data_point = + find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected"); + assert_eq!(data_point.value, 600); + + let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "another"); + assert!(data_point.is_none(), "point should not be present"); + + let data_point = + find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another"); + assert!(data_point.is_none(), "point should not be present"); + } + }); } fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) { @@ -3021,16 +3141,18 @@ mod tests { ); test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_counter", None) else { - unreachable!() - }; + test_context.with_aggregation::("my_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - // Expecting 1 time-series. - assert_eq!(sum.data_points.len(), 1); + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); - // validate the sole datapoint - let data_point1 = &sum.data_points[0]; - assert_eq!(data_point1.value, 6); + // validate the sole datapoint + let data_point1 = &sum.data_points[0]; + assert_eq!(data_point1.value, 6); + }); } fn updown_counter_aggregation_helper(temporality: Temporality) { @@ -3052,30 +3174,32 @@ mod tests { test_context.flush_metrics(); // Assert - let MetricData::Sum(sum) = test_context.get_aggregation::("my_updown_counter", None) - else { - unreachable!() - }; - // Expecting 2 time-series. - assert_eq!(sum.data_points.len(), 2); - assert!( - !sum.is_monotonic, - "UpDownCounter should produce non-monotonic." - ); - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce Cumulative for UpDownCounter" - ); + test_context.with_aggregation::("my_updown_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - // find and validate key1=value2 datapoint - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 5); + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 2); + assert!( + !sum.is_monotonic, + "UpDownCounter should produce non-monotonic." + ); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce Cumulative for UpDownCounter" + ); - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 7); + // find and validate key1=value2 datapoint + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 5); + + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point1.value, 7); + }); // Reset and report more measurements test_context.reset_metrics(); @@ -3091,18 +3215,20 @@ mod tests { test_context.flush_metrics(); - let MetricData::Sum(sum) = test_context.get_aggregation::("my_updown_counter", None) - else { - unreachable!() - }; - assert_eq!(sum.data_points.len(), 2); - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 10); + test_context.with_aggregation::("my_updown_counter", None, |metric| { + let MetricData::Sum(sum) = metric else { + unreachable!() + }; - let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") - .expect("datapoint with key1=value2 expected"); - assert_eq!(data_point1.value, 14); + assert_eq!(sum.data_points.len(), 2); + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + assert_eq!(data_point1.value, 10); + + let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2") + .expect("datapoint with key1=value2 expected"); + assert_eq!(data_point1.value, 14); + }); } fn find_sum_datapoint_with_key_value<'a, T>( @@ -3186,25 +3312,24 @@ mod tests { } struct TestContext { - exporter: InMemoryMetricExporter, + resource_metrics: Arc>>, meter_provider: SdkMeterProvider, - - // Saving this on the test context for lifetime simplicity - resource_metrics: Vec, } impl TestContext { fn new(temporality: Temporality) -> Self { - let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality); - let exporter = exporter.build(); + let resource_metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_temporality(temporality) + .with_metrics(resource_metrics.clone()) + .build(); let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .build(); TestContext { - exporter, + resource_metrics, meter_provider, - resource_metrics: vec![], } } @@ -3212,17 +3337,19 @@ mod tests { where T: Fn(&Instrument) -> Option + Send + Sync + 'static, { - let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality); - let exporter = exporter.build(); + let resource_metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporterBuilder::new() + .with_temporality(temporality) + .with_metrics(resource_metrics.clone()) + .build(); let meter_provider = SdkMeterProvider::builder() - .with_periodic_exporter(exporter.clone()) + .with_periodic_exporter(exporter) .with_view(view) .build(); TestContext { - exporter, + resource_metrics, meter_provider, - resource_metrics: vec![], } } @@ -3263,39 +3390,30 @@ mod tests { } fn reset_metrics(&self) { - self.exporter.reset(); + self.resource_metrics.lock().unwrap().clear(); } fn check_no_metrics(&self) { - let resource_metrics = self - .exporter - .get_finished_metrics() - .expect("metrics expected to be exported"); // TODO: Need to fix InMemoryMetricExporter to return None. - + let resource_metrics = self.resource_metrics.lock().unwrap(); assert!(resource_metrics.is_empty(), "no metrics should be exported"); } - fn get_aggregation( + fn with_aggregation( &mut self, counter_name: &str, unit_name: Option<&str>, - ) -> &MetricData { - self.resource_metrics = self - .exporter - .get_finished_metrics() - .expect("metrics expected to be exported"); + f: impl FnOnce(&MetricData), + ) { + let resource_metrics = self.resource_metrics.lock().unwrap(); - assert!( - !self.resource_metrics.is_empty(), - "no metrics were exported" - ); + assert!(!resource_metrics.is_empty(), "no metrics were exported"); - assert!( - self.resource_metrics.len() == 1, + assert_eq!( + resource_metrics.len(), + 1, "Expected single resource metrics." ); - let resource_metric = self - .resource_metrics + let resource_metric = resource_metrics .first() .expect("This should contain exactly one resource metric, as validated above."); @@ -3311,34 +3429,30 @@ mod tests { assert_eq!(metric.unit, expected_unit); } - T::extract_metrics_data_ref(&metric.data) - .expect("Failed to cast aggregation to expected type") + let metric_data = T::extract_metrics_data_ref(&metric.data) + .expect("Failed to cast aggregation to expected type"); + + f(metric_data); } - fn get_from_multiple_aggregations( - &mut self, + fn with_multiple_aggregations( + &self, counter_name: &str, unit_name: Option<&str>, invocation_count: usize, - ) -> Vec<&MetricData> { - self.resource_metrics = self - .exporter - .get_finished_metrics() - .expect("metrics expected to be exported"); + f: impl FnOnce(Vec<&MetricData>), + ) { + let resource_metrics = self.resource_metrics.lock().unwrap(); - assert!( - !self.resource_metrics.is_empty(), - "no metrics were exported" - ); + assert!(!resource_metrics.is_empty(), "no metrics were exported"); assert_eq!( - self.resource_metrics.len(), + resource_metrics.len(), invocation_count, "Expected collect to be called {invocation_count} times" ); - let result = self - .resource_metrics + let result = resource_metrics .iter() .map(|resource_metric| { assert!( @@ -3361,7 +3475,7 @@ mod tests { }) .collect::>(); - result + f(result); } } } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 70e78f9253..62c45dd9f9 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -510,6 +510,9 @@ impl MetricReader for PeriodicReader { #[cfg(all(test, feature = "testing"))] mod tests { + //! Use below command to run all tests: + //! `cargo test metrics::periodic_reader::tests --features=testing,spec_unstable_metrics_views -- --nocapture` + use super::PeriodicReader; use crate::{ error::{OTelSdkError, OTelSdkResult}, @@ -520,6 +523,7 @@ mod tests { Resource, }; use opentelemetry::metrics::MeterProvider; + use std::sync::Mutex; use std::{ sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -528,9 +532,6 @@ mod tests { time::Duration, }; - // use below command to run all tests - // cargo test metrics::periodic_reader::tests --features=testing,spec_unstable_metrics_views -- --nocapture - #[derive(Debug, Clone)] struct MetricExporterThatFailsOnlyOnFirst { count: Arc, @@ -608,8 +609,13 @@ mod tests { fn collection_triggered_by_interval_multiple() { // Arrange let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()) + + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter) .with_interval(interval) .build(); let i = Arc::new(AtomicUsize::new(0)); @@ -639,8 +645,12 @@ mod tests { #[test] fn shutdown_repeat() { // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let result = meter_provider.shutdown(); @@ -660,8 +670,12 @@ mod tests { #[test] fn flush_after_shutdown() { // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let result = meter_provider.force_flush(); @@ -678,8 +692,12 @@ mod tests { #[test] fn flush_repeat() { // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let result = meter_provider.force_flush(); @@ -693,8 +711,12 @@ mod tests { #[test] fn periodic_reader_without_pipeline() { // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter).build(); let rm = &mut ResourceMetrics { resource: Resource::empty(), @@ -836,8 +858,12 @@ mod tests { fn collection_helper(trigger: fn(SdkMeterProvider)) { // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter).build(); let (sender, receiver) = mpsc::channel(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); @@ -858,11 +884,8 @@ mod tests { .recv_timeout(Duration::ZERO) .expect("message should be available in channel, indicating a collection occurred, which should trigger observable callback"); - let exported_metrics = exporter - .get_finished_metrics() - .expect("this should not fail"); assert!( - !exported_metrics.is_empty(), + !metrics.lock().unwrap().is_empty(), "Metrics should be available in exporter." ); } @@ -894,9 +917,13 @@ mod tests { } fn async_inside_observable_callback_helper() { - let interval = std::time::Duration::from_millis(10); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()) + let interval = Duration::from_millis(10); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter) .with_interval(interval) .build(); @@ -913,11 +940,8 @@ mod tests { .build(); meter_provider.force_flush().expect("flush should succeed"); - let exported_metrics = exporter - .get_finished_metrics() - .expect("this should not fail"); assert!( - !exported_metrics.is_empty(), + !metrics.lock().unwrap().is_empty(), "Metrics should be available in exporter." ); } @@ -951,8 +975,12 @@ mod tests { } fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) { - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter).build(); let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); let meter = meter_provider.meter("test"); @@ -985,11 +1013,8 @@ mod tests { }; meter_provider.force_flush().expect("flush should succeed"); - let exported_metrics = exporter - .get_finished_metrics() - .expect("this should not fail"); assert!( - !exported_metrics.is_empty(), + !metrics.lock().unwrap().is_empty(), "Metrics should be available in exporter." ); } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 066db5b340..5a6bfa424d 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -449,7 +449,7 @@ mod tests { runtime, Resource, }; use opentelemetry::metrics::MeterProvider; - use std::sync::mpsc; + use std::sync::{mpsc, Arc, Mutex}; #[test] fn collection_triggered_by_interval_tokio_current() { @@ -492,8 +492,12 @@ mod tests { #[test] fn unregistered_collect() { // Arrange - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); let mut rm = ResourceMetrics { resource: Resource::empty(), scope_metrics: Vec::new(), @@ -513,8 +517,13 @@ mod tests { RT: crate::runtime::Runtime, { let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime) + + let metrics = Arc::new(Mutex::new(Vec::new())); + let exporter = InMemoryMetricExporter::builder() + .with_metrics(metrics.clone()) + .build(); + + let reader = PeriodicReader::builder(exporter, runtime) .with_interval(interval) .build(); let (sender, receiver) = mpsc::channel();