diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 6ba2f705d4..11fec2126a 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -4,6 +4,7 @@ - TODO: Placeholder for Span processor related things - *Fix* SpanProcessor::on_start is no longer called on non recording spans +- **Fix**: Restore true parallel exports in the async-native `BatchSpanProcessor` by honoring `OTEL_BSP_MAX_CONCURRENT_EXPORTS` ([#2959](https://github.com/open-telemetry/opentelemetry-rust/pull/3028)). A regression in [#2685](https://github.com/open-telemetry/opentelemetry-rust/pull/2685) inadvertently awaited the `export()` future directly in `opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs` instead of spawning it on the runtime, forcing all exports to run sequentially. ## 0.30.0 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index aa92787ea7..3a5ebf0e5e 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -22,7 +22,7 @@ serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { workspace = true, optional = true } thiserror = { workspace = true } url = { workspace = true, optional = true } -tokio = { workspace = true, features = ["rt", "time"], optional = true } +tokio = { workspace = true, default-features = false, optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } @@ -47,15 +47,15 @@ spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled" metrics = ["opentelemetry/metrics"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] experimental_async_runtime = [] -rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] -rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"] +rt-tokio = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"] +rt-tokio-current-thread = ["tokio/rt", "tokio/time", "tokio-stream", "experimental_async_runtime"] internal-logs = ["opentelemetry/internal-logs"] experimental_metrics_periodicreader_with_async_runtime = ["metrics", "experimental_async_runtime"] spec_unstable_metrics_views = ["metrics"] experimental_metrics_custom_reader = ["metrics"] experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimental_async_runtime"] experimental_logs_concurrent_log_processor = ["logs"] -experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"] +experimental_trace_batch_span_processor_with_async_runtime = ["tokio/sync", "trace", "experimental_async_runtime"] experimental_metrics_disable_name_validation = ["metrics"] [[bench]] diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 95e5d2397a..b294f74043 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -6,19 +6,21 @@ use crate::trace::Span; use crate::trace::SpanProcessor; use crate::trace::{SpanData, SpanExporter}; use futures_channel::oneshot; -use futures_util::pin_mut; use futures_util::{ future::{self, BoxFuture, Either}, - select, + pin_mut, select, stream::{self, FusedStream, FuturesUnordered}, StreamExt as _, }; use opentelemetry::Context; use opentelemetry::{otel_debug, otel_error, otel_warn}; use std::fmt; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; use std::time::Duration; +use tokio::sync::RwLock; /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. @@ -188,13 +190,22 @@ struct BatchSpanProcessorInternal { spans: Vec, export_tasks: FuturesUnordered>, runtime: R, - exporter: E, config: BatchConfig, + // TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`) + // for all methods. This would allow us to remove the `RwLock` and just use `Arc`, + // similar to how `crate::logs::LogExporter` is implemented. + exporter: Arc>, } -impl BatchSpanProcessorInternal { +impl BatchSpanProcessorInternal { async fn flush(&mut self, res_channel: Option>) { - let export_result = self.export().await; + let export_result = Self::export( + self.spans.split_off(0), + self.exporter.clone(), + self.runtime.clone(), + self.config.max_export_timeout, + ) + .await; let task = Box::pin(async move { if let Some(channel) = res_channel { // If a response channel is provided, attempt to send the export result through it. @@ -243,9 +254,15 @@ impl BatchSpanProcessorInternal { self.export_tasks.next().await; } - let export_result = self.export().await; + let batch = self.spans.split_off(0); + let exporter = self.exporter.clone(); + let runtime = self.runtime.clone(); + let max_export_timeout = self.config.max_export_timeout; + let task = async move { - if let Err(err) = export_result { + if let Err(err) = + Self::export(batch, exporter, runtime, max_export_timeout).await + { otel_error!( name: "BatchSpanProcessor.Export.Error", reason = format!("{}", err) @@ -254,6 +271,7 @@ impl BatchSpanProcessorInternal { Ok(()) }; + // Special case when not using concurrent exports if self.config.max_concurrent_exports == 1 { let _ = task.await; @@ -288,34 +306,39 @@ impl BatchSpanProcessorInternal { // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { self.flush(Some(ch)).await; - let _ = self.exporter.shutdown(); + let _ = self.exporter.write().await.shutdown(); return false; } // propagate the resource BatchMessage::SetResource(resource) => { - self.exporter.set_resource(&resource); + self.exporter.write().await.set_resource(&resource); } } true } - async fn export(&mut self) -> OTelSdkResult { + async fn export( + batch: Vec, + exporter: Arc>, + runtime: R, + max_export_timeout: Duration, + ) -> OTelSdkResult { // Batch size check for flush / shutdown. Those methods may be called // when there's no work to do. - if self.spans.is_empty() { + if batch.is_empty() { return Ok(()); } - let export = self.exporter.export(self.spans.split_off(0)); - let timeout = self.runtime.delay(self.config.max_export_timeout); - let time_out = self.config.max_export_timeout; + let exporter_guard = exporter.read().await; + let export = exporter_guard.export(batch); + let timeout = runtime.delay(max_export_timeout); pin_mut!(export); pin_mut!(timeout); match future::select(export, timeout).await { Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), + Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)), } } @@ -368,7 +391,7 @@ impl BatchSpanProcessor { export_tasks: FuturesUnordered::new(), runtime: timeout_runtime, config, - exporter, + exporter: Arc::new(RwLock::new(exporter)), }; processor.run(messages).await @@ -435,6 +458,8 @@ mod tests { use crate::trace::{SpanData, SpanExporter}; use futures_util::Future; use std::fmt::Debug; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use std::sync::Arc; use std::time::Duration; struct BlockingExporter { @@ -463,6 +488,39 @@ mod tests { } } + /// Exporter that records whether two exports overlap in time. + struct TrackingExporter { + /// Artificial delay to keep each export alive for a while. + delay: Duration, + /// Current number of in-flight exports. + active: Arc, + /// Set to true the first time we see overlap. + concurrent_seen: Arc, + } + + impl Debug for TrackingExporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("tracking exporter") + } + } + + impl SpanExporter for TrackingExporter { + async fn export(&self, _batch: Vec) -> crate::error::OTelSdkResult { + // Increment in-flight counter and note any overlap. + let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1; + if inflight > 1 { + self.concurrent_seen.store(true, Ordering::SeqCst); + } + + // Keep the export "busy" for a bit. + tokio::time::sleep(self.delay).await; + + // Decrement counter. + self.active.fetch_sub(1, Ordering::SeqCst); + Ok(()) + } + } + #[test] fn test_build_batch_span_processor_builder() { let mut env_vars = vec![ @@ -532,8 +590,8 @@ mod tests { ); } - // If the time_out is true, then the result suppose to ended with timeout. - // otherwise the exporter should be able to export within time out duration. + // If `time_out` is `true`, then the export should fail with a timeout. + // Else, the exporter should be able to export within the timeout duration. async fn timeout_test_tokio(time_out: bool) { let config = BatchConfig { max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), @@ -557,24 +615,92 @@ mod tests { assert!(shutdown_res.is_ok()); } - #[test] - fn test_timeout_tokio_timeout() { + #[tokio::test(flavor = "multi_thread")] + async fn test_timeout_tokio_timeout() { // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. // Either way, the test should be finished within 5s. - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(timeout_test_tokio(true)); + timeout_test_tokio(true).await; } - #[test] - fn test_timeout_tokio_not_timeout() { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(timeout_test_tokio(false)); + #[tokio::test(flavor = "multi_thread")] + async fn test_timeout_tokio_not_timeout() { + timeout_test_tokio(false).await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_concurrent_exports_expected() { + // Shared state for the exporter. + let active = Arc::new(AtomicUsize::new(0)); + let concurrent_seen = Arc::new(AtomicBool::new(false)); + + let exporter = TrackingExporter { + delay: Duration::from_millis(50), + active: active.clone(), + concurrent_seen: concurrent_seen.clone(), + }; + + // Intentionally tiny batch-size so every span forces an export. + let config = BatchConfig { + max_export_batch_size: 1, + max_queue_size: 16, + scheduled_delay: Duration::from_secs(3600), // effectively disabled + max_export_timeout: Duration::from_secs(5), + max_concurrent_exports: 2, // what we want to verify + }; + + // Spawn the processor. + let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); + + // Finish three spans in rapid succession. + processor.on_end(new_test_export_span_data()); + processor.on_end(new_test_export_span_data()); + processor.on_end(new_test_export_span_data()); + + // Wait until everything has been exported. + processor.force_flush().expect("force flush failed"); + processor.shutdown().expect("shutdown failed"); + + // Expect at least one period with >1 export in flight. + assert!( + concurrent_seen.load(Ordering::SeqCst), + "exports never overlapped, processor is still serialising them" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_exports_serial_when_max_concurrent_exports_1() { + let active = Arc::new(AtomicUsize::new(0)); + let concurrent_seen = Arc::new(AtomicBool::new(false)); + + let exporter = TrackingExporter { + delay: Duration::from_millis(50), + active: active.clone(), + concurrent_seen: concurrent_seen.clone(), + }; + + let config = BatchConfig { + max_export_batch_size: 1, + max_queue_size: 16, + scheduled_delay: Duration::from_secs(3600), + max_export_timeout: Duration::from_secs(5), + max_concurrent_exports: 1, // what we want to verify + }; + + let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); + + // Finish several spans quickly. + processor.on_end(new_test_export_span_data()); + processor.on_end(new_test_export_span_data()); + processor.on_end(new_test_export_span_data()); + + processor.force_flush().expect("force flush failed"); + processor.shutdown().expect("shutdown failed"); + + // There must never have been more than one export in flight. + assert!( + !concurrent_seen.load(Ordering::SeqCst), + "exports overlapped even though max_concurrent_exports was 1" + ); } }