Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
};
use opentelemetry::Context;
use opentelemetry::{otel_debug, otel_error, otel_warn};
use std::collections::VecDeque;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -185,7 +186,7 @@
}

struct BatchSpanProcessorInternal<E, R> {
spans: Vec<SpanData>,
spans: VecDeque<SpanData>,
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
runtime: R,
exporter: E,
Expand All @@ -194,7 +195,7 @@

impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
let export_result = self.export().await;
let export_result = self.export().await; // TODO: Move execution to `export_tasks`.
let task = Box::pin(async move {
if let Some(channel) = res_channel {
// If a response channel is provided, attempt to send the export result through it.
Expand Down Expand Up @@ -233,17 +234,31 @@
match message {
// Span has finished, add to buffer of pending spans.
BatchMessage::ExportSpan(span) => {
self.spans.push(span);
if self.spans.len() == self.config.max_queue_size {
// Replace the oldest span with the new span to avoid suspending messages
// processing.
self.spans.pop_front();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be silently dropping the old span - Should we log a warning here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalitb We should. Also, we probably need to bound on self.config.max_queue_size instead of self.config.max_export_batch_size. BatchConfig notifies the spans will be dropped only if max_queue_size reached, not max_export_batch_size.


otel_warn!(

Check warning on line 242 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L240-L242

Added lines #L240 - L242 were not covered by tests
name: "BatchSpanProcessor.Export.Error",
dropped_spans = 1,
max_queue_size = self.config.max_queue_size,

Check warning on line 245 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L245

Added line #L245 was not covered by tests
message = "Spans were dropped due to a full queue / slow export. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}
self.spans.push_back(span);

if self.spans.len() == self.config.max_export_batch_size {
if self.spans.len() >= self.config.max_export_batch_size {
// If concurrent exports are saturated, wait for one to complete.
if !self.export_tasks.is_empty()
&& self.export_tasks.len() == self.config.max_concurrent_exports
{
// TODO: Refactor to avoid stopping message processing to not delay
// shutdown/resource set because of export saturation.
self.export_tasks.next().await;
}

let export_result = self.export().await;
let export_result = self.export().await; // TODO: Move execution to `export_tasks`.

Check warning on line 261 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L261

Added line #L261 was not covered by tests
let task = async move {
if let Err(err) = export_result {
otel_error!(
Expand Down Expand Up @@ -306,7 +321,8 @@
return Ok(());
}

let export = self.exporter.export(self.spans.split_off(0));
let count = self.spans.len().min(self.config.max_export_batch_size);
let export = self.exporter.export(self.spans.drain(..count).collect());
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;

Expand Down Expand Up @@ -364,7 +380,7 @@

let messages = Box::pin(stream::select(message_receiver, ticker));
let processor = BatchSpanProcessorInternal {
spans: Vec::new(),
spans: VecDeque::new(),
export_tasks: FuturesUnordered::new(),
runtime: timeout_runtime,
config,
Expand Down
Loading