diff --git a/opentelemetry-otlp/tests/integration_test/src/lib.rs b/opentelemetry-otlp/tests/integration_test/src/lib.rs index 65faf81bf4..edcc653e5c 100644 --- a/opentelemetry-otlp/tests/integration_test/src/lib.rs +++ b/opentelemetry-otlp/tests/integration_test/src/lib.rs @@ -1,4 +1,4 @@ pub mod logs_asserter; -pub mod metrics_asserter; +pub mod metric_helpers; pub mod test_utils; pub mod trace_asserter; diff --git a/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs b/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs new file mode 100644 index 0000000000..cd0e7a60be --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/src/metric_helpers.rs @@ -0,0 +1,261 @@ +#![cfg(unix)] +use crate::test_utils; +use anyhow::Result; +use anyhow::{Context, Ok}; +use opentelemetry_otlp::MetricExporter; +use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::Resource; +use serde_json::Value; +use std::fs; +use std::fs::File; +use std::io::BufReader; +use std::io::Read; +use std::time::Duration; +use tracing::info; + +static RESULT_PATH: &str = "actual/metrics.json"; +pub const SLEEP_DURATION: Duration = Duration::from_secs(5); + +/// +/// Creates an exporter using the appropriate HTTP or gRPC client based on +/// the configured features. +/// +fn create_exporter() -> MetricExporter { + let exporter_builder = MetricExporter::builder(); + + #[cfg(feature = "tonic-client")] + let exporter_builder = exporter_builder.with_tonic(); + #[cfg(not(feature = "tonic-client"))] + #[cfg(any( + feature = "hyper-client", + feature = "reqwest-client", + feature = "reqwest-blocking-client" + ))] + let exporter_builder = exporter_builder.with_http(); + + exporter_builder + .build() + .expect("Failed to build MetricExporter") +} + +/// Initializes the OpenTelemetry metrics pipeline +fn init_meter_provider() -> SdkMeterProvider { + let exporter = create_exporter(); + let reader = PeriodicReader::builder(exporter).build(); + let resource = Resource::builder_empty() + .with_service_name("metrics-integration-test") + .build(); + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource) + .with_reader(reader) + .build(); + opentelemetry::global::set_meter_provider(meter_provider.clone()); + meter_provider +} + +/// +/// Performs setup for metrics tests using the Tokio runtime. +/// +pub async fn setup_metrics_tokio() -> SdkMeterProvider { + let _ = test_utils::start_collector_container().await; + // Truncate results + _ = File::create(RESULT_PATH).expect("it's good"); + info!("Truncated metrics file"); + + init_meter_provider() +} + +/// +/// Performs setup for metrics tests. +/// +pub fn setup_metrics_non_tokio( + initialize_metric_in_tokio: bool, +) -> (SdkMeterProvider, tokio::runtime::Runtime) { + let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); + let meter_provider: SdkMeterProvider = if initialize_metric_in_tokio { + // Initialize the logger provider inside the Tokio runtime + rt.block_on(async { + // Setup the collector container inside Tokio runtime + let _ = test_utils::start_collector_container().await; + init_meter_provider() + }) + } else { + rt.block_on(async { + let _ = test_utils::start_collector_container().await; + }); + + // Initialize the logger provider outside the Tokio runtime + init_meter_provider() + }; + + (meter_provider, rt) +} + +/// +/// Check that the results contain the given string. +/// +pub fn assert_metrics_results_contains(expected_content: &str) -> Result<()> { + // let contents = fs::read_to_string(test_utils::METRICS_FILE)?; + let file = File::open(test_utils::METRICS_FILE)?; + let mut contents = String::new(); + let mut reader = std::io::BufReader::new(&file); + reader.read_to_string(&mut contents)?; + assert!( + contents.contains(expected_content), + "Expected content {} not found in actual content {}", + expected_content, + contents + ); + Ok(()) +} + +/// +/// Retrieves the latest metrics for the given scope. Each test should use +/// its own scope, so that we can easily pull the data for it out from the rest +/// of the data. +/// +/// This will also retrieve the resource attached to the scope. +/// +pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result { + // Open the file and fetch the contents + let contents = fs::read_to_string(test_utils::METRICS_FILE)?; + + // Find the last parseable metrics line that contains the desired scope + let json_line = contents + .lines() + .rev() + .find_map(|line| { + // Attempt to parse the line as JSON + serde_json::from_str::(line) + .ok() + .and_then(|mut json_line| { + // Check if it contains the specified scope + if let Some(resource_metrics) = json_line + .get_mut("resourceMetrics") + .and_then(|v| v.as_array_mut()) + { + resource_metrics.retain_mut(|resource| { + if let Some(scope_metrics) = resource + .get_mut("scopeMetrics") + .and_then(|v| v.as_array_mut()) + { + scope_metrics.retain(|scope| { + scope + .get("scope") + .and_then(|s| s.get("name")) + .and_then(|name| name.as_str()) + .map_or(false, |n| n == scope_name) + }); + + // Keep the resource only if it has any matching `ScopeMetrics` + !scope_metrics.is_empty() + } else { + false + } + }); + + // If any resource metrics remain, return this line + if !resource_metrics.is_empty() { + return Some(json_line); + } + } + + None + }) + }) + .with_context(|| { + format!( + "No valid JSON line containing scope `{}` found.", + scope_name + ) + })?; + + Ok(json_line) +} + +/// +/// Check that the metrics for the given scope match what we expect. This +/// includes zeroing out timestamps, which we reasonably expect not to match. +/// +pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> { + // Define the results file path + let results_file_path = format!("./expected/metrics/{}.json", scope_name); + + // Fetch the actual metrics for the given scope + let actual_metrics = fetch_latest_metrics_for_scope(scope_name) + .context(format!("Failed to fetch metrics for scope: {}", scope_name))?; + + // Read the expected metrics from the results file + let expected_metrics = { + let file = File::open(&results_file_path).context(format!( + "Failed to open results file: {}", + results_file_path + ))?; + read_metrics_from_json(file) + }?; + + // Compare the actual metrics with the expected metrics + MetricsAsserter::new(actual_metrics, expected_metrics).assert(); + + Ok(()) +} + +pub fn read_metrics_from_json(file: File) -> Result { + // Create a buffered reader for the file + let mut reader = BufReader::new(file); + let mut contents = String::new(); + + // Read the file contents into a string + reader + .read_to_string(&mut contents) + .expect("Failed to read json file"); + + // Parse the contents into a JSON Value + let metrics_data: Value = serde_json::from_str(&contents)?; + Ok(metrics_data) +} + +pub struct MetricsAsserter { + results: Value, + expected: Value, +} + +impl MetricsAsserter { + pub fn new(results: Value, expected: Value) -> Self { + MetricsAsserter { results, expected } + } + + pub fn assert(mut self) { + // Normalize JSON by cleaning out timestamps + Self::zero_out_timestamps(&mut self.results); + Self::zero_out_timestamps(&mut self.expected); + + // Perform the assertion + assert_eq!( + self.results, self.expected, + "Metrics did not match. Results: {:#?}, Expected: {:#?}", + self.results, self.expected + ); + } + + /// Recursively removes or zeros out timestamp fields in the JSON + fn zero_out_timestamps(value: &mut Value) { + match value { + Value::Object(map) => { + for (key, val) in map.iter_mut() { + if key == "startTimeUnixNano" || key == "timeUnixNano" { + *val = Value::String("0".to_string()); + } else { + Self::zero_out_timestamps(val); + } + } + } + Value::Array(array) => { + for item in array.iter_mut() { + Self::zero_out_timestamps(item); + } + } + _ => {} + } + } +} diff --git a/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs deleted file mode 100644 index f370df8a62..0000000000 --- a/opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs +++ /dev/null @@ -1,64 +0,0 @@ -use anyhow::Result; -use serde_json::Value; -use std::fs::File; -use std::io::{BufReader, Read}; - -pub fn read_metrics_from_json(file: File) -> Result { - // Create a buffered reader for the file - let mut reader = BufReader::new(file); - let mut contents = String::new(); - - // Read the file contents into a string - reader - .read_to_string(&mut contents) - .expect("Failed to read json file"); - - // Parse the contents into a JSON Value - let metrics_data: Value = serde_json::from_str(&contents)?; - Ok(metrics_data) -} - -pub struct MetricsAsserter { - results: Value, - expected: Value, -} - -impl MetricsAsserter { - pub fn new(results: Value, expected: Value) -> Self { - MetricsAsserter { results, expected } - } - - pub fn assert(mut self) { - // Normalize JSON by cleaning out timestamps - Self::zero_out_timestamps(&mut self.results); - Self::zero_out_timestamps(&mut self.expected); - - // Perform the assertion - assert_eq!( - self.results, self.expected, - "Metrics did not match. Results: {:#?}, Expected: {:#?}", - self.results, self.expected - ); - } - - /// Recursively removes or zeros out timestamp fields in the JSON - fn zero_out_timestamps(value: &mut Value) { - match value { - Value::Object(map) => { - for (key, val) in map.iter_mut() { - if key == "startTimeUnixNano" || key == "timeUnixNano" { - *val = Value::String("0".to_string()); - } else { - Self::zero_out_timestamps(val); - } - } - } - Value::Array(array) => { - for item in array.iter_mut() { - Self::zero_out_timestamps(item); - } - } - _ => {} - } - } -} diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index d5662407f9..7a32569743 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -99,7 +99,7 @@ pub async fn start_collector_container() -> Result<()> { .await?; let container = Arc::new(container_instance); - otel_debug!( + otel_info!( name: "Container started", ports = format!("{:?}", container.ports().await)); @@ -108,6 +108,8 @@ pub async fn start_collector_container() -> Result<()> { // Store the container in COLLECTOR_ARC *arc_guard = Some(Arc::clone(&container)); + } else { + otel_info!(name: "OTel Collector already running"); } Ok(()) diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs index 311ddbfae7..74dac103cd 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs @@ -1,318 +1,140 @@ -//! OTLP integration tests for metrics -//! Note: these are all expressed using Serde types for the deserialized metrics records. -//! We might consider changing this once we have fixed the issue identified in the #[ignore]d test -//! `test_roundtrip_example_data` - as the roundtripping is currently broken for metrics. +//! OTLP integration tests for metrics. These tests cover various OTel Metric +//! SDK/OTLP Exporter scenarios in particular focusing on ensuring that various +//! async runtimes works well. This also includes validating that shutdown, +//! force_flush are working as expected. Validation is simple in the sense it +//! merely checks the presence of a UUID in the exported metrics, which is good +//! enough to confirm that metrics have been accepted by OTel Collector. //! #![cfg(unix)] -use anyhow::{Context, Result}; +use anyhow::{Ok, Result}; use ctor::dtor; -use integration_test_runner::metrics_asserter::{read_metrics_from_json, MetricsAsserter}; use integration_test_runner::test_utils; -use integration_test_runner::test_utils::start_collector_container; use opentelemetry::KeyValue; -use opentelemetry_otlp::MetricExporter; -use opentelemetry_proto::tonic::metrics::v1::MetricsData; -use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}; -use opentelemetry_sdk::Resource; -use serde_json::Value; -use std::fs; -use std::fs::File; -use std::sync::Mutex; +use opentelemetry_sdk::metrics::SdkMeterProvider; use std::time::Duration; -static SETUP_DONE: Mutex = Mutex::new(false); +const SLEEP_DURATION: Duration = Duration::from_secs(5); -static RESULT_PATH: &str = "actual/metrics.json"; - -/// Initializes the OpenTelemetry metrics pipeline -async fn init_metrics() -> SdkMeterProvider { - let exporter = create_exporter(); - - let reader = PeriodicReader::builder(exporter) - .with_interval(Duration::from_millis(500)) - .with_timeout(Duration::from_secs(1)) - .build(); - - let resource = Resource::builder_empty() - .with_service_name("metrics-integration-test") - .build(); - - let meter_provider = MeterProviderBuilder::default() - .with_resource(resource) - .with_reader(reader) - .build(); - - opentelemetry::global::set_meter_provider(meter_provider.clone()); - - meter_provider -} - -/// -/// Creates an exporter using the appropriate HTTP or gRPC client based on -/// the configured features. -/// -fn create_exporter() -> MetricExporter { - let exporter_builder = MetricExporter::builder(); - - #[cfg(feature = "tonic-client")] - let exporter_builder = exporter_builder.with_tonic(); - #[cfg(not(feature = "tonic-client"))] - #[cfg(any( - feature = "hyper-client", - feature = "reqwest-client", - feature = "reqwest-blocking-client" - ))] - let exporter_builder = exporter_builder.with_http(); - - exporter_builder - .build() - .expect("Failed to build MetricExporter") -} - -/// -/// Retrieves the latest metrics for the given scope. Each test should use -/// its own scope, so that we can easily pull the data for it out from the rest -/// of the data. -/// -/// This will also retrieve the resource attached to the scope. -/// -pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result { - // Open the file and fetch the contents - let contents = fs::read_to_string(test_utils::METRICS_FILE)?; - - // Find the last parseable metrics line that contains the desired scope - let json_line = contents - .lines() - .rev() - .find_map(|line| { - // Attempt to parse the line as JSON - serde_json::from_str::(line) - .ok() - .and_then(|mut json_line| { - // Check if it contains the specified scope - if let Some(resource_metrics) = json_line - .get_mut("resourceMetrics") - .and_then(|v| v.as_array_mut()) - { - resource_metrics.retain_mut(|resource| { - if let Some(scope_metrics) = resource - .get_mut("scopeMetrics") - .and_then(|v| v.as_array_mut()) - { - scope_metrics.retain(|scope| { - scope - .get("scope") - .and_then(|s| s.get("name")) - .and_then(|name| name.as_str()) - .map_or(false, |n| n == scope_name) - }); - - // Keep the resource only if it has any matching `ScopeMetrics` - !scope_metrics.is_empty() - } else { - false - } - }); - - // If any resource metrics remain, return this line - if !resource_metrics.is_empty() { - return Some(json_line); - } - } - - None - }) - }) - .with_context(|| { - format!( - "No valid JSON line containing scope `{}` found.", - scope_name - ) - })?; - - Ok(json_line) -} - -/// -/// Performs setup for metrics tests -/// -async fn setup_metrics_test() -> Result<()> { - // Make sure the collector container is running - start_collector_container().await?; - - let mut done = SETUP_DONE.lock().unwrap(); - if !*done { - println!("Running setup before any tests..."); - *done = true; // Mark setup as done - - // Initialize the metrics subsystem - _ = init_metrics().await; - } - - // Truncate results - _ = File::create(RESULT_PATH).expect("it's good"); - - Ok(()) -} - -/// -/// Check that the metrics for the given scope match what we expect. This -/// includes zeroing out timestamps, which we reasonably expect not to match. -/// -pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> { - // Define the results file path - let results_file_path = format!("./expected/metrics/{}.json", scope_name); - - // Fetch the actual metrics for the given scope - let actual_metrics = fetch_latest_metrics_for_scope(scope_name) - .context(format!("Failed to fetch metrics for scope: {}", scope_name))?; - - // Read the expected metrics from the results file - let expected_metrics = { - let file = File::open(&results_file_path).context(format!( - "Failed to open results file: {}", - results_file_path - ))?; - read_metrics_from_json(file) - }?; - - // Compare the actual metrics with the expected metrics - MetricsAsserter::new(actual_metrics, expected_metrics).assert(); - - Ok(()) -} - -/// -/// TODO - the HTTP metrics exporters except reqwest-blocking-client do not seem -/// to work at the moment. -/// TODO - fix this asynchronously. -/// #[cfg(test)] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] mod metrictests { - use super::*; - use opentelemetry::metrics::MeterProvider; + use integration_test_runner::metric_helpers::{ + assert_metrics_results_contains, setup_metrics_non_tokio, setup_metrics_tokio, + }; + use tokio::runtime::Handle; + use uuid::Uuid; - /// - /// Validate JSON/Protobuf models roundtrip correctly. - /// - /// TODO - this test fails currently. Fields disappear, such as the actual value of a given metric. - /// This appears to be on the _deserialization_ side. - /// Issue: https://github.com/open-telemetry/opentelemetry-rust/issues/2434 - /// - #[tokio::test] - #[ignore] - async fn test_roundtrip_example_data() -> Result<()> { - let metrics_in = include_str!("../expected/metrics/test_u64_counter_meter.json"); - let metrics: MetricsData = serde_json::from_str(metrics_in)?; - let metrics_out = serde_json::to_string(&metrics)?; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn counter_tokio_multi_thread() -> Result<()> { + metric_helper_tokio().await + } - println!("{:}", metrics_out); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_tokio_multi_thread_one_worker() -> Result<()> { + metric_helper_tokio().await + } - let metrics_in_json: Value = serde_json::from_str(metrics_in)?; - let metrics_out_json: Value = serde_json::from_str(&metrics_out)?; + #[tokio::test(flavor = "current_thread")] + async fn counter_tokio_current() -> Result<()> { + metric_helper_tokio_current().await + } - assert_eq!(metrics_in_json, metrics_out_json); + #[test] + fn counter_non_tokio() -> Result<()> { + metric_helper_non_tokio() + } - Ok(()) + async fn metric_helper_tokio() -> Result<()> { + let meter_provider = setup_metrics_tokio().await; + emit_and_validate_metrics(meter_provider) } - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_u64_counter() -> Result<()> { - let _result_path = setup_metrics_test().await; - const METER_NAME: &str = "test_u64_counter_meter"; + async fn metric_helper_tokio_current() -> Result<()> { + let meter_provider = setup_metrics_tokio().await; - // Add data to u64_counter - let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + const METER_NAME: &str = "test_meter"; + const INSTRUMENT_NAME: &str = "test_counter"; - let counter = meter.u64_counter("counter_u64").build(); + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + let expected_uuid = Uuid::new_v4().to_string(); + let counter = meter.u64_counter(INSTRUMENT_NAME).build(); counter.add( 10, &[ - KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey1", expected_uuid.clone()), KeyValue::new("mykey2", "myvalue2"), ], ); - tokio::time::sleep(Duration::from_secs(2)).await; + // In tokio::current_thread flavor, shutdown must be done in a separate thread + let _res = Handle::current() + .spawn_blocking(move || meter_provider.shutdown()) + .await + .unwrap(); + // We still need to sleep, to give otel-collector a chance to flush to disk + std::thread::sleep(SLEEP_DURATION); - // Validate metrics against results file - validate_metrics_against_results(METER_NAME)?; + // Validate metrics against results file This is not the extensive + // validation of output, but good enough to confirm that metrics have + // been accepted by OTel Collector. + assert_metrics_results_contains(&expected_uuid)?; Ok(()) } - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_histogram() -> Result<()> { - _ = setup_metrics_test().await; - const METER_NAME: &str = "test_histogram_meter"; + fn metric_helper_non_tokio() -> Result<()> { + let (meter_provider, _rt) = setup_metrics_non_tokio(true); + const METER_NAME: &str = "test_meter"; + const INSTRUMENT_NAME: &str = "test_counter"; - // Add data to histogram + // Add data to u64_counter let meter = opentelemetry::global::meter_provider().meter(METER_NAME); - let histogram = meter.u64_histogram("example_histogram").build(); - histogram.record(42, &[KeyValue::new("mykey3", "myvalue4")]); - tokio::time::sleep(Duration::from_secs(5)).await; + let expected_uuid = Uuid::new_v4().to_string(); + let counter = meter.u64_counter(INSTRUMENT_NAME).build(); + counter.add( + 10, + &[ + KeyValue::new("mykey1", expected_uuid.clone()), + KeyValue::new("mykey2", "myvalue2"), + ], + ); - validate_metrics_against_results(METER_NAME)?; + meter_provider.shutdown()?; + // We still need to sleep, to give otel-collector a chance to flush to disk + std::thread::sleep(SLEEP_DURATION); - Ok(()) + // Validate metrics against results file This is not the extensive + // validation of output, but good enough to confirm that metrics have + // been accepted by OTel Collector. + assert_metrics_results_contains(&expected_uuid) } - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_up_down_counter() -> Result<()> { - _ = setup_metrics_test().await; - const METER_NAME: &str = "test_up_down_meter"; + fn emit_and_validate_metrics(meter_provider: SdkMeterProvider) -> Result<()> { + const METER_NAME: &str = "test_meter"; + const INSTRUMENT_NAME: &str = "test_counter"; - // Add data to up_down_counter + // Add data to u64_counter let meter = opentelemetry::global::meter_provider().meter(METER_NAME); - let up_down_counter = meter.i64_up_down_counter("example_up_down_counter").build(); - up_down_counter.add(-1, &[KeyValue::new("mykey5", "myvalue5")]); - tokio::time::sleep(Duration::from_secs(5)).await; - - validate_metrics_against_results(METER_NAME)?; - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[ignore] - async fn test_flush_on_shutdown() -> Result<()> { - const METER_NAME: &str = "test_flush_on_shutdown"; - - // Set everything up by hand, so that we can shutdown() the exporter - // and make sure our data is flushed through. - - // Make sure the collector is running - start_collector_container().await?; - - // Set up the exporter - let exporter = create_exporter(); - let reader = PeriodicReader::builder(exporter) - .with_interval(Duration::from_secs(30)) - .with_timeout(Duration::from_secs(1)) - .build(); - let resource = Resource::builder_empty() - .with_service_name("metrics-integration-test") - .build(); - let meter_provider = MeterProviderBuilder::default() - .with_resource(resource) - .with_reader(reader) - .build(); - - // Send something - let meter = meter_provider.meter(METER_NAME); - let counter = meter.u64_counter("counter_").build(); - counter.add(123, &[]); + let expected_uuid = Uuid::new_v4().to_string(); + let counter = meter.u64_counter(INSTRUMENT_NAME).build(); + counter.add( + 10, + &[ + KeyValue::new("mykey1", expected_uuid.clone()), + KeyValue::new("mykey2", "myvalue2"), + ], + ); - // Shutdown meter_provider.shutdown()?; - // We still need to sleep, to give otel-collector a chance to flush to disk - tokio::time::sleep(Duration::from_secs(2)).await; + std::thread::sleep(SLEEP_DURATION); - validate_metrics_against_results(METER_NAME)?; + // Validate metrics against results file This is not the extensive + // validation of output, but good enough to confirm that metrics have + // been accepted by OTel Collector. + assert_metrics_results_contains(&expected_uuid)?; Ok(()) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics_roundtrip.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics_roundtrip.rs new file mode 100644 index 0000000000..c2e1670a30 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics_roundtrip.rs @@ -0,0 +1,122 @@ +//! OTLP integration tests for metrics. These tests cover the breadth of Metric +//! API by testing all instrument types and ensuring that the data is correctly +//! exported to the collector by validating the exported data against the +//! expected results. +//! Note: these are all expressed using Serde types for the deserialized metrics records. +//! We might consider changing this once we have fixed the issue identified in the #[ignore]d test +//! `test_roundtrip_example_data` - as the roundtripping is currently broken for metrics. +//! +#![cfg(unix)] + +use anyhow::{Ok, Result}; +use ctor::dtor; +use integration_test_runner::test_utils; +use opentelemetry::KeyValue; +use opentelemetry_proto::tonic::metrics::v1::MetricsData; +use serde_json::Value; + +#[cfg(test)] +#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] +mod metrictests_roundtrip { + use integration_test_runner::metric_helpers::{ + self, validate_metrics_against_results, SLEEP_DURATION, + }; + + use super::*; + + /// + /// Validate JSON/Protobuf models roundtrip correctly. + /// + /// TODO - this test fails currently. Fields disappear, such as the actual value of a given metric. + /// This appears to be on the _deserialization_ side. + /// Issue: https://github.com/open-telemetry/opentelemetry-rust/issues/2434 + /// + #[tokio::test] + #[ignore] + async fn test_roundtrip_example_data() -> Result<()> { + let metrics_in = include_str!("../expected/metrics/test_u64_counter_meter.json"); + let metrics: MetricsData = serde_json::from_str(metrics_in)?; + let metrics_out = serde_json::to_string(&metrics)?; + + println!("{:}", metrics_out); + + let metrics_in_json: Value = serde_json::from_str(metrics_in)?; + let metrics_out_json: Value = serde_json::from_str(&metrics_out)?; + + assert_eq!(metrics_in_json, metrics_out_json); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_u64_counter() -> Result<()> { + let meter_provider = metric_helpers::setup_metrics_tokio().await; + const METER_NAME: &str = "test_u64_counter_meter"; + + // Add data to u64_counter + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + + let counter = meter.u64_counter("counter_u64").build(); + counter.add( + 10, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + + meter_provider.shutdown()?; + tokio::time::sleep(SLEEP_DURATION).await; + + // Validate metrics against results file + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_histogram() -> Result<()> { + let meter_provider = metric_helpers::setup_metrics_tokio().await; + const METER_NAME: &str = "test_histogram_meter"; + + // Add data to histogram + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + let histogram = meter.u64_histogram("example_histogram").build(); + histogram.record(42, &[KeyValue::new("mykey3", "myvalue4")]); + + meter_provider.shutdown()?; + tokio::time::sleep(SLEEP_DURATION).await; + + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_up_down_counter() -> Result<()> { + let meter_provider = metric_helpers::setup_metrics_tokio().await; + const METER_NAME: &str = "test_up_down_meter"; + + // Add data to up_down_counter + let meter = opentelemetry::global::meter_provider().meter(METER_NAME); + let up_down_counter = meter.i64_up_down_counter("example_up_down_counter").build(); + up_down_counter.add(-1, &[KeyValue::new("mykey5", "myvalue5")]); + + meter_provider.shutdown()?; + tokio::time::sleep(SLEEP_DURATION).await; + + validate_metrics_against_results(METER_NAME)?; + + Ok(()) + } +} + +/// +/// Make sure we stop the collector container, otherwise it will sit around hogging our +/// ports and subsequent test runs will fail. +/// +#[dtor] +fn shutdown() { + println!("metrics::shutdown"); + test_utils::stop_collector_container(); +}