Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
};
use opentelemetry::Context;
use opentelemetry::{otel_debug, otel_error, otel_warn};
use std::collections::VecDeque;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -185,7 +186,7 @@
}

struct BatchSpanProcessorInternal<E, R> {
spans: Vec<SpanData>,
spans: VecDeque<SpanData>,
export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
runtime: R,
exporter: E,
Expand All @@ -194,7 +195,7 @@

impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
let export_result = self.export().await;
let export_result = self.export().await; // TODO: Move execution to `export_tasks`.
let task = Box::pin(async move {
if let Some(channel) = res_channel {
// If a response channel is provided, attempt to send the export result through it.
Expand Down Expand Up @@ -233,17 +234,24 @@
match message {
// Span has finished, add to buffer of pending spans.
BatchMessage::ExportSpan(span) => {
self.spans.push(span);
if self.spans.len() == self.config.max_export_batch_size {
// Replace the oldest span with the new span to avoid suspending messages
// processing.
self.spans.pop_front();

Check warning on line 240 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L238-L240

Added lines #L238 - L240 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be silently dropping the old span - Should we log a warning here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalitb We should. Also, we probably need to bound on self.config.max_queue_size instead of self.config.max_export_batch_size. BatchConfig notifies the spans will be dropped only if max_queue_size reached, not max_export_batch_size.

}
self.spans.push_back(span);

if self.spans.len() == self.config.max_export_batch_size {
// If concurrent exports are saturated, wait for one to complete.
if !self.export_tasks.is_empty()
&& self.export_tasks.len() == self.config.max_concurrent_exports
{
// TODO: Refactor to avoid stopping message processing to not delay
// shutdown/resource set because of export saturation.
self.export_tasks.next().await;
}

let export_result = self.export().await;
let export_result = self.export().await; // TODO: Move execution to `export_tasks`.

Check warning on line 254 in opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs#L254

Added line #L254 was not covered by tests
let task = async move {
if let Err(err) = export_result {
otel_error!(
Expand Down Expand Up @@ -306,7 +314,7 @@
return Ok(());
}

let export = self.exporter.export(self.spans.split_off(0));
let export = self.exporter.export(self.spans.drain(..).collect());
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;

Expand Down Expand Up @@ -364,7 +372,7 @@

let messages = Box::pin(stream::select(message_receiver, ticker));
let processor = BatchSpanProcessorInternal {
spans: Vec::new(),
spans: VecDeque::new(),
export_tasks: FuturesUnordered::new(),
runtime: timeout_runtime,
config,
Expand Down
Loading