Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 118 additions & 94 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::io::Read;
use std::os::unix::fs::MetadataExt;
use std::time::Duration;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use uuid::Uuid;

fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
let exporter_builder = LogExporter::builder();
Expand Down Expand Up @@ -43,25 +46,85 @@ fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
Ok(logger_provider)
}

async fn logs_tokio_helper(is_simple: bool) -> Result<()> {
use crate::{assert_logs_results_contains, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs(is_simple).unwrap();
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

fn logs_non_tokio_helper(is_simple: bool, init_logs_inside_rt: bool) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;
let logger_provider = if init_logs_inside_rt {
// Initialize the logger provider inside the Tokio runtime
rt.block_on(async {
// Setup the collector container inside Tokio runtime
test_utils::start_collector_container().await?;
init_logs(is_simple)
})?
} else {
// Initialize the logger provider outside the Tokio runtime
rt.block_on(async {
let _ = test_utils::start_collector_container().await;
});
init_logs(is_simple)?
};

let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);

// Generate a random UUID and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(
target: "my-target",
uuid = expected_uuid,
"hello from {}. My price is {}.",
"banana",
2.99
);
}

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> {
let file = File::open(result)?;
let mut contents = String::new();
let mut reader = std::io::BufReader::new(&file);
reader.read_to_string(&mut contents)?;
assert!(contents.contains(expected_content));
Ok(())
}

#[cfg(test)]
mod logtests {
// TODO: The tests in this mod works like below: Emit a log with a UUID,
// The tests in this mod works like below: Emit a log with a UUID,
// then read the logs from the file and check if the UUID is present in the
// logs. This makes it easy to validate with a single collector and its
// output. This is a very simple test but good enough to validate that OTLP
// Exporter did work! A more comprehensive test would be to validate the
// entire Payload. The infra for it already exists (logs_asserter.rs), the
// TODO here is to write a test that validates the entire payload.
// Exporter did work!

use super::*;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::{fs::File, time::Duration};
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use uuid::Uuid;
use std::fs::File;

#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
Expand All @@ -87,6 +150,8 @@ mod logtests {
Ok(())
}

// Batch Processor

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_multi_thread() -> Result<()> {
Expand All @@ -105,104 +170,63 @@ mod logtests {
logs_tokio_helper(false).await
}

async fn logs_tokio_helper(is_simple: bool) -> Result<()> {
use crate::{assert_logs_results_contains, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs(is_simple).unwrap();
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main_init_logs_inside_rt() -> Result<()> {
logs_non_tokio_helper(false, true)
}

#[test]
#[cfg(feature = "reqwest-blocking-client")]
pub fn logs_batch_non_tokio_main_with_init_logs_outside_rt() -> Result<()> {
logs_non_tokio_helper(false, false)
}

// Simple Processor

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main_with_init_logs_inside_rt() -> Result<()> {
logs_non_tokio_helper(true, true)
}

#[test]
#[cfg(any(feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main_with_init_logs_outsie_rt() -> Result<()> {
logs_non_tokio_helper(true, false)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
#[cfg(any(
feature = "tonic-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
logs_tokio_helper(true).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
#[cfg(any(
feature = "tonic-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> {
logs_tokio_helper(true).await
}

// Ignored, to be investigated
#[ignore]
#[ignore] // https://github.com/open-telemetry/opentelemetry-rust/issues/2539
#[tokio::test(flavor = "current_thread")]
#[cfg(any(feature = "tonic-client", feature = "reqwest-client"))]
#[cfg(any(
feature = "tonic-client",
feature = "reqwest-client",
feature = "hyper-client"
))]
pub async fn logs_simple_tokio_current() -> Result<()> {
logs_tokio_helper(true).await
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main() -> Result<()> {
logs_non_tokio_helper(false)
}

fn logs_non_tokio_helper(is_simple: bool) -> Result<()> {
// Initialize the logger provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
// created by BatchLogProcessor.
let rt = tokio::runtime::Runtime::new()?;
let logger_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
test_utils::start_collector_container().await?;
init_logs(is_simple)
})?;
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_simple_non_tokio_main() -> Result<()> {
logs_non_tokio_helper(true)
}
}

pub fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> {
let file = File::open(result)?;
let mut contents = String::new();
let mut reader = std::io::BufReader::new(&file);
reader.read_to_string(&mut contents)?;
assert!(contents.contains(expected_content));
Ok(())
}

pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
let left = read_logs_from_json(File::open(expected)?)?;
let right = read_logs_from_json(File::open(result)?)?;

LogsAsserter::new(left, right).assert();

assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
3 changes: 1 addition & 2 deletions scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ if [ -d "$TEST_DIR" ]; then
echo "Integration Tests: Hyper Client (Disabled now)"
echo ####
echo
# TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "hyper-client","internal-logs"
cargo test --no-default-features --features "hyper-client","internal-logs" --test logs
else
echo "Directory $TEST_DIR does not exist. Skipping tests."
exit 1
Expand Down
Loading