From 7531799216ff6692cca74ea1c32e496b267300e4 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 22 Jan 2025 18:02:07 -0800 Subject: [PATCH 1/4] initil commit --- .../tests/integration_test/tests/logs.rs | 213 +++++++++++------- scripts/integration_tests.sh | 3 +- 2 files changed, 131 insertions(+), 85 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 62992a162f..5c0eb37f1f 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -2,14 +2,17 @@ use anyhow::Result; use ctor::dtor; -use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; use integration_test_runner::test_utils; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::fs::File; use std::io::Read; -use std::os::unix::fs::MetadataExt; +use std::time::Duration; +use tracing::info; +use tracing_subscriber::layer::SubscriberExt; +use uuid::Uuid; fn init_logs(is_simple: bool) -> Result { let exporter_builder = LogExporter::builder(); @@ -43,25 +46,99 @@ fn init_logs(is_simple: bool) -> Result { Ok(logger_provider) } +async fn logs_tokio_helper(is_simple: bool) -> Result<()> { + use crate::{assert_logs_results_contains, init_logs}; + test_utils::start_collector_container().await?; + + let logger_provider = init_logs(is_simple).unwrap(); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); + } + + let _ = logger_provider.shutdown(); + tokio::time::sleep(Duration::from_secs(5)).await; + assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; + Ok(()) +} + +fn logs_non_tokio_helper_with_init_logs_inside_rt(is_simple: bool) -> Result<()> { + // Initialize the logger provider inside a tokio runtime + // as this allows tonic client to capture the runtime, + // but actual export occurs from the dedicated std::thread + // created by BatchLogProcessor. + let rt = tokio::runtime::Runtime::new()?; + let logger_provider = rt.block_on(async { + // While we're here setup our collector container too, as this needs tokio to run + test_utils::start_collector_container().await?; + init_logs(is_simple) + })?; + let layer = OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); + } + + let _ = logger_provider.shutdown(); + std::thread::sleep(Duration::from_secs(5)); + assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; + Ok(()) +} + +#[cfg(feature = "reqwest-blocking-client")] +fn logs_non_tokio_helper_with_init_logs_outside_rt(is_simple: bool) -> Result<()> { + // Initialize the logger provider outside a tokio runtime + // to validate non-tonic clients (reqwest) + // and the actual export occurs from the dedicated std::thread + // created by BatchLogProcessor. + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { + // While we're here setup our collector container too, as this needs tokio to run + let _ = test_utils::start_collector_container().await; + }); + let logger_provider = init_logs(is_simple).unwrap(); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); + } + + let _ = logger_provider.shutdown(); + std::thread::sleep(Duration::from_secs(5)); + assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; + Ok(()) +} + +fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> { + let file = File::open(result)?; + let mut contents = String::new(); + let mut reader = std::io::BufReader::new(&file); + reader.read_to_string(&mut contents)?; + assert!(contents.contains(expected_content)); + Ok(()) +} + #[cfg(test)] mod logtests { - // TODO: The tests in this mod works like below: Emit a log with a UUID, + // The tests in this mod works like below: Emit a log with a UUID, // then read the logs from the file and check if the UUID is present in the // logs. This makes it easy to validate with a single collector and its // output. This is a very simple test but good enough to validate that OTLP - // Exporter did work! A more comprehensive test would be to validate the - // entire Payload. The infra for it already exists (logs_asserter.rs), the - // TODO here is to write a test that validates the entire payload. + // Exporter did work! use super::*; use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; - use integration_test_runner::test_utils; - use opentelemetry_appender_tracing::layer; - use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; - use std::{fs::File, time::Duration}; - use tracing::info; - use tracing_subscriber::layer::SubscriberExt; - use uuid::Uuid; + use std::fs::File; #[test] #[should_panic(expected = "assertion `left == right` failed: body does not match")] @@ -87,6 +164,8 @@ mod logtests { Ok(()) } + // Batch Processor + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub async fn logs_batch_tokio_multi_thread() -> Result<()> { @@ -105,24 +184,30 @@ mod logtests { logs_tokio_helper(false).await } - async fn logs_tokio_helper(is_simple: bool) -> Result<()> { - use crate::{assert_logs_results_contains, init_logs}; - test_utils::start_collector_container().await?; + #[test] + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub fn logs_batch_non_tokio_main_init_logs_inside_rt() -> Result<()> { + logs_non_tokio_helper_with_init_logs_inside_rt(false) + } - let logger_provider = init_logs(is_simple).unwrap(); - let layer = OpenTelemetryTracingBridge::new(&logger_provider); - let subscriber = tracing_subscriber::registry().with(layer); - // generate a random uuid and store it to expected guid - let expected_uuid = Uuid::new_v4().to_string(); - { - let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); - } - - let _ = logger_provider.shutdown(); - tokio::time::sleep(Duration::from_secs(5)).await; - assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; - Ok(()) + #[test] + #[cfg(feature = "reqwest-blocking-client")] + pub fn logs_batch_non_tokio_main_with_init_logs_outside_rt() -> Result<()> { + logs_non_tokio_helper_with_init_logs_outside_rt(false) + } + + // Simple Processor + + #[test] + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub fn logs_simple_non_tokio_main_with_init_logs_inside_rt() -> Result<()> { + logs_non_tokio_helper_with_init_logs_inside_rt(true) + } + + #[test] + #[cfg(any(feature = "reqwest-blocking-client"))] + pub fn logs_simple_non_tokio_main_with_init_logs_outsie_rt() -> Result<()> { + logs_non_tokio_helper_with_init_logs_outside_rt(true) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -145,64 +230,26 @@ mod logtests { logs_tokio_helper(true).await } - #[test] - #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] - pub fn logs_batch_non_tokio_main() -> Result<()> { - logs_non_tokio_helper(false) - } - - fn logs_non_tokio_helper(is_simple: bool) -> Result<()> { - // Initialize the logger provider inside a tokio runtime - // as this allows tonic client to capture the runtime, - // but actual export occurs from the dedicated std::thread - // created by BatchLogProcessor. - let rt = tokio::runtime::Runtime::new()?; - let logger_provider = rt.block_on(async { - // While we're here setup our collector container too, as this needs tokio to run - test_utils::start_collector_container().await?; - init_logs(is_simple) - })?; - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); - let subscriber = tracing_subscriber::registry().with(layer); - // generate a random uuid and store it to expected guid - let expected_uuid = Uuid::new_v4().to_string(); - { - let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); - } - - let _ = logger_provider.shutdown(); - std::thread::sleep(Duration::from_secs(5)); - assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; - Ok(()) + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[cfg(feature = "hyper-client")] + pub async fn logs_simple_tokio_multi_thread() -> Result<()> { + logs_tokio_helper(true).await } - #[test] - #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] - pub fn logs_simple_non_tokio_main() -> Result<()> { - logs_non_tokio_helper(true) + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[cfg(feature = "hyper-client")] + pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> { + logs_tokio_helper(true).await } -} -pub fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> { - let file = File::open(result)?; - let mut contents = String::new(); - let mut reader = std::io::BufReader::new(&file); - reader.read_to_string(&mut contents)?; - assert!(contents.contains(expected_content)); - Ok(()) -} - -pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> { - let left = read_logs_from_json(File::open(expected)?)?; - let right = read_logs_from_json(File::open(result)?)?; - - LogsAsserter::new(left, right).assert(); - - assert!(File::open(result).unwrap().metadata().unwrap().size() > 0); - Ok(()) + // Ignored, to be investigated + #[ignore] + #[tokio::test(flavor = "current_thread")] + #[cfg(feature = "hyper-client")] + pub async fn logs_simple_tokio_current() -> Result<()> { + logs_tokio_helper(true).await + } } - /// /// Make sure we stop the collector container, otherwise it will sit around hogging our /// ports and subsequent test runs will fail. diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index 44dd6afa74..377ec7ba62 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -35,8 +35,7 @@ if [ -d "$TEST_DIR" ]; then echo "Integration Tests: Hyper Client (Disabled now)" echo #### echo - # TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported. - #cargo test --no-default-features --features "hyper-client","internal-logs" + cargo test --no-default-features --features "hyper-client","internal-logs" --test logs else echo "Directory $TEST_DIR does not exist. Skipping tests." exit 1 From 97b819b074fbab2cf80adaca69a21c6225fcbff8 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 22 Jan 2025 18:12:37 -0800 Subject: [PATCH 2/4] add issue# with ignore --- opentelemetry-otlp/tests/integration_test/tests/logs.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 5c0eb37f1f..7e4319949d 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -222,8 +222,7 @@ mod logtests { logs_tokio_helper(true).await } - // Ignored, to be investigated - #[ignore] + #[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539 #[tokio::test(flavor = "current_thread")] #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] pub async fn logs_simple_tokio_current() -> Result<()> { @@ -242,8 +241,7 @@ mod logtests { logs_tokio_helper(true).await } - // Ignored, to be investigated - #[ignore] + #[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539 #[tokio::test(flavor = "current_thread")] #[cfg(feature = "hyper-client")] pub async fn logs_simple_tokio_current() -> Result<()> { From 54f3ce9e3e3424082b9da350b2968be7c5bab7c6 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 22 Jan 2025 18:35:27 -0800 Subject: [PATCH 3/4] fix --- .../tests/integration_test/tests/logs.rs | 95 ++++++------------- 1 file changed, 31 insertions(+), 64 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 7e4319949d..d7b07c05f0 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -66,51 +66,37 @@ async fn logs_tokio_helper(is_simple: bool) -> Result<()> { Ok(()) } -fn logs_non_tokio_helper_with_init_logs_inside_rt(is_simple: bool) -> Result<()> { - // Initialize the logger provider inside a tokio runtime - // as this allows tonic client to capture the runtime, - // but actual export occurs from the dedicated std::thread - // created by BatchLogProcessor. +fn logs_non_tokio_helper(is_simple: bool, init_logs_inside_rt: bool) -> Result<()> { let rt = tokio::runtime::Runtime::new()?; - let logger_provider = rt.block_on(async { - // While we're here setup our collector container too, as this needs tokio to run - test_utils::start_collector_container().await?; - init_logs(is_simple) - })?; - let layer = OpenTelemetryTracingBridge::new(&logger_provider); - let subscriber = tracing_subscriber::registry().with(layer); - // generate a random uuid and store it to expected guid - let expected_uuid = Uuid::new_v4().to_string(); - { - let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); - } - - let _ = logger_provider.shutdown(); - std::thread::sleep(Duration::from_secs(5)); - assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; - Ok(()) -} + let logger_provider = if init_logs_inside_rt { + // Initialize the logger provider inside the Tokio runtime + rt.block_on(async { + // Setup the collector container inside Tokio runtime + test_utils::start_collector_container().await?; + init_logs(is_simple) + })? + } else { + // Initialize the logger provider outside the Tokio runtime + rt.block_on(async { + let _ = test_utils::start_collector_container().await; + }); + init_logs(is_simple)? + }; -#[cfg(feature = "reqwest-blocking-client")] -fn logs_non_tokio_helper_with_init_logs_outside_rt(is_simple: bool) -> Result<()> { - // Initialize the logger provider outside a tokio runtime - // to validate non-tonic clients (reqwest) - // and the actual export occurs from the dedicated std::thread - // created by BatchLogProcessor. - let rt = tokio::runtime::Runtime::new()?; - rt.block_on(async { - // While we're here setup our collector container too, as this needs tokio to run - let _ = test_utils::start_collector_container().await; - }); - let logger_provider = init_logs(is_simple).unwrap(); let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); - // generate a random uuid and store it to expected guid + + // Generate a random UUID and store it to expected guid let expected_uuid = Uuid::new_v4().to_string(); { let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); + info!( + target: "my-target", + uuid = expected_uuid, + "hello from {}. My price is {}.", + "banana", + 2.99 + ); } let _ = logger_provider.shutdown(); @@ -187,13 +173,13 @@ mod logtests { #[test] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub fn logs_batch_non_tokio_main_init_logs_inside_rt() -> Result<()> { - logs_non_tokio_helper_with_init_logs_inside_rt(false) + logs_non_tokio_helper(false, true) } #[test] #[cfg(feature = "reqwest-blocking-client")] pub fn logs_batch_non_tokio_main_with_init_logs_outside_rt() -> Result<()> { - logs_non_tokio_helper_with_init_logs_outside_rt(false) + logs_non_tokio_helper(false, false) } // Simple Processor @@ -201,49 +187,30 @@ mod logtests { #[test] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub fn logs_simple_non_tokio_main_with_init_logs_inside_rt() -> Result<()> { - logs_non_tokio_helper_with_init_logs_inside_rt(true) + logs_non_tokio_helper(true, true) } #[test] #[cfg(any(feature = "reqwest-blocking-client"))] pub fn logs_simple_non_tokio_main_with_init_logs_outsie_rt() -> Result<()> { - logs_non_tokio_helper_with_init_logs_outside_rt(true) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] - pub async fn logs_simple_tokio_multi_thread() -> Result<()> { - logs_tokio_helper(true).await - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] - pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> { - logs_tokio_helper(true).await - } - - #[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539 - #[tokio::test(flavor = "current_thread")] - #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] - pub async fn logs_simple_tokio_current() -> Result<()> { - logs_tokio_helper(true).await + logs_non_tokio_helper(true, false) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[cfg(feature = "hyper-client")] + #[cfg(any(feature = "tonic-client", feature = "reqwest-client", feature = "hyper-client"))] pub async fn logs_simple_tokio_multi_thread() -> Result<()> { logs_tokio_helper(true).await } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[cfg(feature = "hyper-client")] + #[cfg(any(feature = "tonic-client", feature = "reqwest-client", feature = "hyper-client"))] pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> { logs_tokio_helper(true).await } #[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539 #[tokio::test(flavor = "current_thread")] - #[cfg(feature = "hyper-client")] + #[cfg(any(feature = "tonic-client", feature = "reqwest-client", feature = "hyper-client"))] pub async fn logs_simple_tokio_current() -> Result<()> { logs_tokio_helper(true).await } From ac1b8e8423b6faec8a4bcf0d35f371ff8498442a Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 22 Jan 2025 18:40:45 -0800 Subject: [PATCH 4/4] fix lint --- .../tests/integration_test/tests/logs.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index d7b07c05f0..169df71601 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -197,20 +197,32 @@ mod logtests { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[cfg(any(feature = "tonic-client", feature = "reqwest-client", feature = "hyper-client"))] + #[cfg(any( + feature = "tonic-client", + feature = "reqwest-client", + feature = "hyper-client" + ))] pub async fn logs_simple_tokio_multi_thread() -> Result<()> { logs_tokio_helper(true).await } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[cfg(any(feature = "tonic-client", feature = "reqwest-client", feature = "hyper-client"))] + #[cfg(any( + feature = "tonic-client", + feature = "reqwest-client", + feature = "hyper-client" + ))] pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> { logs_tokio_helper(true).await } #[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539 #[tokio::test(flavor = "current_thread")] - #[cfg(any(feature = "tonic-client", feature = "reqwest-client", feature = "hyper-client"))] + #[cfg(any( + feature = "tonic-client", + feature = "reqwest-client", + feature = "hyper-client" + ))] pub async fn logs_simple_tokio_current() -> Result<()> { logs_tokio_helper(true).await }