diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index dc3a7f273b..b6181912ec 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -15,6 +15,7 @@ use futures_util::{ }; use opentelemetry::Context; use opentelemetry::{otel_debug, otel_error, otel_warn}; +use std::collections::VecDeque; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -185,7 +186,7 @@ enum BatchMessage { } struct BatchSpanProcessorInternal { - spans: Vec, + spans: VecDeque, export_tasks: FuturesUnordered>, runtime: R, exporter: E, @@ -194,7 +195,7 @@ struct BatchSpanProcessorInternal { impl BatchSpanProcessorInternal { async fn flush(&mut self, res_channel: Option>) { - let export_result = self.export().await; + let export_result = self.export().await; // TODO: Move execution to `export_tasks`. let task = Box::pin(async move { if let Some(channel) = res_channel { // If a response channel is provided, attempt to send the export result through it. @@ -233,17 +234,31 @@ impl BatchSpanProcessorInternal { match message { // Span has finished, add to buffer of pending spans. BatchMessage::ExportSpan(span) => { - self.spans.push(span); + if self.spans.len() == self.config.max_queue_size { + // Replace the oldest span with the new span to avoid suspending messages + // processing. + self.spans.pop_front(); + + otel_warn!( + name: "BatchSpanProcessor.Export.Error", + dropped_spans = 1, + max_queue_size = self.config.max_queue_size, + message = "Spans were dropped due to a full queue / slow export. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + self.spans.push_back(span); - if self.spans.len() == self.config.max_export_batch_size { + if self.spans.len() >= self.config.max_export_batch_size { // If concurrent exports are saturated, wait for one to complete. if !self.export_tasks.is_empty() && self.export_tasks.len() == self.config.max_concurrent_exports { + // TODO: Refactor to avoid stopping message processing to not delay + // shutdown/resource set because of export saturation. self.export_tasks.next().await; } - let export_result = self.export().await; + let export_result = self.export().await; // TODO: Move execution to `export_tasks`. let task = async move { if let Err(err) = export_result { otel_error!( @@ -306,7 +321,8 @@ impl BatchSpanProcessorInternal { return Ok(()); } - let export = self.exporter.export(self.spans.split_off(0)); + let count = self.spans.len().min(self.config.max_export_batch_size); + let export = self.exporter.export(self.spans.drain(..count).collect()); let timeout = self.runtime.delay(self.config.max_export_timeout); let time_out = self.config.max_export_timeout; @@ -364,7 +380,7 @@ impl BatchSpanProcessor { let messages = Box::pin(stream::select(message_receiver, ticker)); let processor = BatchSpanProcessorInternal { - spans: Vec::new(), + spans: VecDeque::new(), export_tasks: FuturesUnordered::new(), runtime: timeout_runtime, config,