From 7a5d76fbec489523626fafd05c293a9f71f8bc22 Mon Sep 17 00:00:00 2001 From: 50U10FCA7 <50u10fca7@gmail.com> Date: Wed, 12 Mar 2025 08:12:00 -0500 Subject: [PATCH 1/3] Fix OOM in `span_processor_with_async_runtime::BatchSpanProcessor` --- opentelemetry-sdk/Cargo.toml | 20 +++- .../span_processor_with_async_runtime.rs | 106 ++++++++++-------- 2 files changed, 78 insertions(+), 48 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 9ae4dcc433..4160e04f2b 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -23,7 +23,7 @@ serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { workspace = true, optional = true } thiserror = { workspace = true } url = { workspace = true, optional = true } -tokio = { workspace = true, features = ["rt", "time"], optional = true } +tokio = { workspace = true, optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} @@ -48,9 +48,21 @@ logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] metrics = ["opentelemetry/metrics", "glob"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] -experimental_async_runtime = [] -rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] -rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"] +experimental_async_runtime = ["tokio", "tokio/sync"] +rt-tokio = [ + "tokio", + "tokio-stream", + "tokio/rt", + "tokio/time", + "experimental_async_runtime" +] +rt-tokio-current-thread = [ + "tokio", + "tokio-stream", + "tokio/rt", + "tokio/time", + "experimental_async_runtime" +] rt-async-std = ["async-std", "experimental_async_runtime"] internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index dc3a7f273b..c474b8b92c 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -6,18 +6,20 @@ use crate::trace::Span; use crate::trace::SpanProcessor; use crate::trace::{SpanData, SpanExporter}; use futures_channel::oneshot; -use futures_util::pin_mut; use futures_util::{ future::{self, BoxFuture, Either}, - select, + pin_mut, select, stream::{self, FusedStream, FuturesUnordered}, - StreamExt as _, + FutureExt as _, StreamExt as _, TryFutureExt as _, }; use opentelemetry::Context; use opentelemetry::{otel_debug, otel_error, otel_warn}; +use std::collections::VecDeque; use std::fmt; +use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use tokio::sync::RwLock; /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. @@ -185,17 +187,20 @@ enum BatchMessage { } struct BatchSpanProcessorInternal { - spans: Vec, + spans: VecDeque, export_tasks: FuturesUnordered>, runtime: R, - exporter: E, + exporter: Arc>, config: BatchConfig, } -impl BatchSpanProcessorInternal { +impl BatchSpanProcessorInternal +where + E: SpanExporter + 'static, + R: RuntimeChannel, +{ async fn flush(&mut self, res_channel: Option>) { - let export_result = self.export().await; - let task = Box::pin(async move { + let task = self.export().map(|export_result| { if let Some(channel) = res_channel { // If a response channel is provided, attempt to send the export result through it. if let Err(result) = channel.send(export_result) { @@ -221,7 +226,7 @@ impl BatchSpanProcessorInternal { if self.config.max_concurrent_exports == 1 { let _ = task.await; } else { - self.export_tasks.push(task); + self.export_tasks.push(task.boxed()); while self.export_tasks.next().await.is_some() {} } } @@ -233,27 +238,32 @@ impl BatchSpanProcessorInternal { match message { // Span has finished, add to buffer of pending spans. BatchMessage::ExportSpan(span) => { - self.spans.push(span); + if self.spans.len() == self.config.max_export_batch_size { + // Replace the oldest span with the new span to avoid suspending messages + // processing. + self.spans.pop_front(); + } + self.spans.push_back(span); if self.spans.len() == self.config.max_export_batch_size { // If concurrent exports are saturated, wait for one to complete. if !self.export_tasks.is_empty() && self.export_tasks.len() == self.config.max_concurrent_exports { + // TODO: Refactor to avoid stopping message processing to not delay + // shutdown/resource set because of export saturation. self.export_tasks.next().await; } - let export_result = self.export().await; - let task = async move { - if let Err(err) = export_result { - otel_error!( - name: "BatchSpanProcessor.Export.Error", - reason = format!("{}", err) - ); - } + let task = self.export().or_else(|err| async move { + otel_error!( + name: "BatchSpanProcessor.Export.Error", + reason = format!("{err}"), + ); Ok(()) - }; + }); + // Special case when not using concurrent exports if self.config.max_concurrent_exports == 1 { let _ = task.await; @@ -288,34 +298,42 @@ impl BatchSpanProcessorInternal { // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { self.flush(Some(ch)).await; - let _ = self.exporter.shutdown(); + let _ = self.exporter.write().await.shutdown(); return false; } // propagate the resource BatchMessage::SetResource(resource) => { - self.exporter.set_resource(&resource); + self.exporter.write().await.set_resource(&resource); } } true } - async fn export(&mut self) -> OTelSdkResult { - // Batch size check for flush / shutdown. Those methods may be called - // when there's no work to do. - if self.spans.is_empty() { - return Ok(()); - } - - let export = self.exporter.export(self.spans.split_off(0)); - let timeout = self.runtime.delay(self.config.max_export_timeout); + #[allow(impl_trait_overcaptures)] // MSRV compatibility. + fn export(&mut self) -> impl Future { + let spans = self.spans.drain(..).collect::>(); + let exporter = self.exporter.clone(); + let runtime = self.runtime.clone(); let time_out = self.config.max_export_timeout; - pin_mut!(export); - pin_mut!(timeout); + async move { + // Batch size check for flush / shutdown. Those methods may be called + // when there's no work to do. + if spans.is_empty() { + return Ok(()); + } + + let exporter = exporter.read().await; + let export = exporter.export(spans); + let timeout = runtime.delay(time_out); + + pin_mut!(export); + pin_mut!(timeout); - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), + match future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), + } } } @@ -328,14 +346,14 @@ impl BatchSpanProcessorInternal { // An export task completed; do we need to do anything with it? }, message = messages.next() => { - match message { - Some(message) => { - if !self.process_message(message).await { - break; - } - }, - None => break, + if let Some(m) = message { + if self.process_message(m).await { + continue; + } } + + // Shutdown if there's no message, or the message indicates shutdown. + break; }, } } @@ -364,11 +382,11 @@ impl BatchSpanProcessor { let messages = Box::pin(stream::select(message_receiver, ticker)); let processor = BatchSpanProcessorInternal { - spans: Vec::new(), + spans: VecDeque::new(), export_tasks: FuturesUnordered::new(), runtime: timeout_runtime, config, - exporter, + exporter: Arc::new(RwLock::new(exporter)), }; processor.run(messages).await From ceb8172faf355771910f950af1b5a98c59392210 Mon Sep 17 00:00:00 2001 From: 50U10FCA7 <50u10fca7@gmail.com> Date: Wed, 12 Mar 2025 11:50:06 -0500 Subject: [PATCH 2/3] Rollback blocking export fix --- opentelemetry-sdk/Cargo.toml | 20 +--- .../span_processor_with_async_runtime.rs | 92 +++++++++---------- 2 files changed, 45 insertions(+), 67 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 4160e04f2b..9ae4dcc433 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -23,7 +23,7 @@ serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { workspace = true, optional = true } thiserror = { workspace = true } url = { workspace = true, optional = true } -tokio = { workspace = true, optional = true } +tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} @@ -48,21 +48,9 @@ logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] metrics = ["opentelemetry/metrics", "glob"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] -experimental_async_runtime = ["tokio", "tokio/sync"] -rt-tokio = [ - "tokio", - "tokio-stream", - "tokio/rt", - "tokio/time", - "experimental_async_runtime" -] -rt-tokio-current-thread = [ - "tokio", - "tokio-stream", - "tokio/rt", - "tokio/time", - "experimental_async_runtime" -] +experimental_async_runtime = [] +rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] +rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"] rt-async-std = ["async-std", "experimental_async_runtime"] internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index c474b8b92c..726d159b29 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -6,20 +6,19 @@ use crate::trace::Span; use crate::trace::SpanProcessor; use crate::trace::{SpanData, SpanExporter}; use futures_channel::oneshot; +use futures_util::pin_mut; use futures_util::{ future::{self, BoxFuture, Either}, - pin_mut, select, + select, stream::{self, FusedStream, FuturesUnordered}, - FutureExt as _, StreamExt as _, TryFutureExt as _, + StreamExt as _, }; use opentelemetry::Context; use opentelemetry::{otel_debug, otel_error, otel_warn}; use std::collections::VecDeque; use std::fmt; -use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use tokio::sync::RwLock; /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. @@ -190,17 +189,14 @@ struct BatchSpanProcessorInternal { spans: VecDeque, export_tasks: FuturesUnordered>, runtime: R, - exporter: Arc>, + exporter: E, config: BatchConfig, } -impl BatchSpanProcessorInternal -where - E: SpanExporter + 'static, - R: RuntimeChannel, -{ +impl BatchSpanProcessorInternal { async fn flush(&mut self, res_channel: Option>) { - let task = self.export().map(|export_result| { + let export_result = self.export().await; // TODO: Move execution to `export_tasks`. + let task = Box::pin(async move { if let Some(channel) = res_channel { // If a response channel is provided, attempt to send the export result through it. if let Err(result) = channel.send(export_result) { @@ -226,7 +222,7 @@ where if self.config.max_concurrent_exports == 1 { let _ = task.await; } else { - self.export_tasks.push(task.boxed()); + self.export_tasks.push(task); while self.export_tasks.next().await.is_some() {} } } @@ -255,15 +251,17 @@ where self.export_tasks.next().await; } - let task = self.export().or_else(|err| async move { - otel_error!( - name: "BatchSpanProcessor.Export.Error", - reason = format!("{err}"), - ); + let export_result = self.export().await; // TODO: Move execution to `export_tasks`. + let task = async move { + if let Err(err) = export_result { + otel_error!( + name: "BatchSpanProcessor.Export.Error", + reason = format!("{}", err) + ); + } Ok(()) - }); - + }; // Special case when not using concurrent exports if self.config.max_concurrent_exports == 1 { let _ = task.await; @@ -298,42 +296,34 @@ where // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { self.flush(Some(ch)).await; - let _ = self.exporter.write().await.shutdown(); + let _ = self.exporter.shutdown(); return false; } // propagate the resource BatchMessage::SetResource(resource) => { - self.exporter.write().await.set_resource(&resource); + self.exporter.set_resource(&resource); } } true } - #[allow(impl_trait_overcaptures)] // MSRV compatibility. - fn export(&mut self) -> impl Future { - let spans = self.spans.drain(..).collect::>(); - let exporter = self.exporter.clone(); - let runtime = self.runtime.clone(); - let time_out = self.config.max_export_timeout; - - async move { - // Batch size check for flush / shutdown. Those methods may be called - // when there's no work to do. - if spans.is_empty() { - return Ok(()); - } + async fn export(&mut self) -> OTelSdkResult { + // Batch size check for flush / shutdown. Those methods may be called + // when there's no work to do. + if self.spans.is_empty() { + return Ok(()); + } - let exporter = exporter.read().await; - let export = exporter.export(spans); - let timeout = runtime.delay(time_out); + let export = self.exporter.export(self.spans.drain(..).collect()); + let timeout = self.runtime.delay(self.config.max_export_timeout); + let time_out = self.config.max_export_timeout; - pin_mut!(export); - pin_mut!(timeout); + pin_mut!(export); + pin_mut!(timeout); - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), - } + match future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)), } } @@ -346,14 +336,14 @@ where // An export task completed; do we need to do anything with it? }, message = messages.next() => { - if let Some(m) = message { - if self.process_message(m).await { - continue; - } + match message { + Some(message) => { + if !self.process_message(message).await { + break; + } + }, + None => break, } - - // Shutdown if there's no message, or the message indicates shutdown. - break; }, } } @@ -386,7 +376,7 @@ impl BatchSpanProcessor { export_tasks: FuturesUnordered::new(), runtime: timeout_runtime, config, - exporter: Arc::new(RwLock::new(exporter)), + exporter, }; processor.run(messages).await From b13bb7174d1fc515781ea8ec493b83f6f14c1334 Mon Sep 17 00:00:00 2001 From: 50U10FCA7 <50u10fca7@gmail.com> Date: Wed, 12 Mar 2025 13:34:43 -0500 Subject: [PATCH 3/3] Limit `Span`s count to `BatchConfig::max_queue_size` --- .../src/trace/span_processor_with_async_runtime.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 726d159b29..b6181912ec 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -234,14 +234,21 @@ impl BatchSpanProcessorInternal { match message { // Span has finished, add to buffer of pending spans. BatchMessage::ExportSpan(span) => { - if self.spans.len() == self.config.max_export_batch_size { + if self.spans.len() == self.config.max_queue_size { // Replace the oldest span with the new span to avoid suspending messages // processing. self.spans.pop_front(); + + otel_warn!( + name: "BatchSpanProcessor.Export.Error", + dropped_spans = 1, + max_queue_size = self.config.max_queue_size, + message = "Spans were dropped due to a full queue / slow export. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); } self.spans.push_back(span); - if self.spans.len() == self.config.max_export_batch_size { + if self.spans.len() >= self.config.max_export_batch_size { // If concurrent exports are saturated, wait for one to complete. if !self.export_tasks.is_empty() && self.export_tasks.len() == self.config.max_concurrent_exports @@ -314,7 +321,8 @@ impl BatchSpanProcessorInternal { return Ok(()); } - let export = self.exporter.export(self.spans.drain(..).collect()); + let count = self.spans.len().min(self.config.max_export_batch_size); + let export = self.exporter.export(self.spans.drain(..count).collect()); let timeout = self.runtime.delay(self.config.max_export_timeout); let time_out = self.config.max_export_timeout;