From daa4529b11523eba0d7dfaedde0796f7612cf878 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 11 Mar 2025 13:39:43 -0700 Subject: [PATCH 1/2] test: BatchProcessor stress testing --- opentelemetry-sdk/src/logs/export.rs | 8 ++ opentelemetry-sdk/src/logs/mod.rs | 2 +- stress/Cargo.toml | 9 +- stress/src/logs_batch.rs | 121 +++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 stress/src/logs_batch.rs diff --git a/opentelemetry-sdk/src/logs/export.rs b/opentelemetry-sdk/src/logs/export.rs index 5ef91c77c8..f833300412 100644 --- a/opentelemetry-sdk/src/logs/export.rs +++ b/opentelemetry-sdk/src/logs/export.rs @@ -77,6 +77,14 @@ impl LogBatch<'_> { index: 0, } } + + /// Returns the number of log records in the batch. + pub fn len(&self) -> usize { + match &self.data { + LogBatchData::SliceOfOwnedData(data) => data.len(), + LogBatchData::SliceOfBorrowedData(data) => data.len(), + } + } } struct LogBatchDataIter<'a> { diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 0da96bb730..a252a8f125 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -37,7 +37,7 @@ mod tests { use crate::Resource; use opentelemetry::baggage::BaggageExt; use opentelemetry::logs::LogRecord; - use opentelemetry::logs::{Logger, LoggerProvider, Severity}; + use opentelemetry::logs::{Logger, LoggerProvider, Severity, }; use opentelemetry::{logs::AnyValue, Key, KeyValue}; use opentelemetry::{Context, InstrumentationScope}; use std::borrow::Borrow; diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 8cc4e7e70a..94d1a27e5b 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -29,6 +29,11 @@ name = "logs" path = "src/logs.rs" doc = false +[[bin]] # Bin to run the logs stress tests with BatchLogProcessor. This is different from existing stress tests. +name = "logs_batch" +path = "src/logs_batch.rs" +doc = false + [[bin]] # Bin to run the traces stress tests name = "traces" path = "src/traces.rs" @@ -44,11 +49,11 @@ ctrlc = { workspace = true } lazy_static = { workspace = true } num_cpus = { workspace = true } opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] } -opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] } +opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled", "testing"] } opentelemetry-appender-tracing = { workspace = true, features = ["spec_unstable_logs_enabled"] } rand = { workspace = true, features = ["small_rng", "os_rng"] } tracing = { workspace = true, features = ["std"]} -tracing-subscriber = { workspace = true, features = ["registry", "std"] } +tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } num-format = { workspace = true } sysinfo = { workspace = true, optional = true } futures-executor = { workspace = true } diff --git a/stress/src/logs_batch.rs b/stress/src/logs_batch.rs new file mode 100644 index 0000000000..d786407354 --- /dev/null +++ b/stress/src/logs_batch.rs @@ -0,0 +1,121 @@ +use opentelemetry::logs::{LogRecord, Logger, LoggerProvider}; +use opentelemetry::Key; +use opentelemetry_appender_tracing::layer; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::logs::{InMemoryLogExporter, LogBatch, LogExporter, SdkLoggerProvider}; +use std::sync::{Arc, Mutex}; +use tracing_subscriber::{prelude::*, EnvFilter}; + +#[derive(Debug)] +struct NoOpExporter { + count: Arc>, +} + +impl NoOpExporter { + fn new() -> Self { + NoOpExporter { + count: Arc::new(Mutex::new(0)), + } + } + + fn get_count(&self) -> Arc> { + Arc::clone(&self.count) + } +} + +impl LogExporter for NoOpExporter { + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let mut count = self.count.lock().unwrap(); + *count += batch.len(); + // Simulate some processing time + // Varying this will affect the number of logs dropped + // and can be used to test the batch log processor + // in a various scenarios. + std::thread::sleep(std::time::Duration::from_millis(10)); + Ok(()) + } +} + +fn main() { + let total_logs_to_emit = 400_000_000; + let num_threads = 4; + // Setup ability to collect internal logs + let inmemory_exporter = InMemoryLogExporter::default(); + let logger_provider_for_internal_logs: SdkLoggerProvider = SdkLoggerProvider::builder() + .with_simple_exporter(inmemory_exporter.clone()) + .build(); + let filter_otel = EnvFilter::new("warn"); + let otel_layer = layer::OpenTelemetryTracingBridge::new(&logger_provider_for_internal_logs) + .with_filter(filter_otel); + + tracing_subscriber::registry().with(otel_layer).init(); + + let exporter = NoOpExporter::new(); + let count = exporter.get_count(); + { + let logger_provider: SdkLoggerProvider = SdkLoggerProvider::builder() + .with_batch_exporter(exporter) + .build(); + let mut handles = vec![]; + + let logs_per_thread = total_logs_to_emit / num_threads; + + for _ in 0..num_threads { + let logger = logger_provider.logger("test_logger"); + let handle = std::thread::spawn(move || { + for _ in 0..logs_per_thread { + let mut log_record = logger.create_log_record(); + log_record.set_body("test log".into()); + logger.emit(log_record); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + logger_provider.shutdown().unwrap(); + } + + let internal_logs = inmemory_exporter + .get_emitted_logs() + .expect("Logs are expected to be exported."); + + let dropped_logs_record = internal_logs + .iter() + .find(|log| log.record.event_name() == Some("BatchLogProcessor.LogsDropped")) + .expect("Logs are expected to be exported."); + + let dropped_logs_count = dropped_logs_record + .record + .attributes_iter() + .find(|attr| attr.0 == Key::new("dropped_logs_count")) + .expect("Logs are expected to be exported."); + let logs_dropped_by_sdk = + if let opentelemetry::logs::AnyValue::String(value) = &dropped_logs_count.1 { + // parse the string to an integer + value.as_str().parse::().unwrap() + } else { + 0 + }; + + let count_logs_received_by_exporter = *count.lock().unwrap() as u64; + + println!("Total logs emitted: {}", total_logs_to_emit); + println!( + "Logs received by exporter: {}", + count_logs_received_by_exporter + ); + println!("Dropped logs count: {}", logs_dropped_by_sdk); + + if logs_dropped_by_sdk + count_logs_received_by_exporter == total_logs_to_emit { + println!("Success! All logs are accounted for!"); + } else { + println!( + "Fail! {} logs are unaccounted for!", + total_logs_to_emit - (logs_dropped_by_sdk + count_logs_received_by_exporter) + ); + } +} From 56c62fb2b232f5fb71b09438a93309a6b119c77c Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 11 Mar 2025 13:40:34 -0700 Subject: [PATCH 2/2] nit --- opentelemetry-sdk/src/logs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index a252a8f125..0da96bb730 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -37,7 +37,7 @@ mod tests { use crate::Resource; use opentelemetry::baggage::BaggageExt; use opentelemetry::logs::LogRecord; - use opentelemetry::logs::{Logger, LoggerProvider, Severity, }; + use opentelemetry::logs::{Logger, LoggerProvider, Severity}; use opentelemetry::{logs::AnyValue, Key, KeyValue}; use opentelemetry::{Context, InstrumentationScope}; use std::borrow::Borrow;