Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 180 additions & 35 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
/// TODO - This method should take reference to `SpanData`
fn on_end(&self, span: SpanData);
/// Force the spans lying in the cache to be exported.
fn force_flush(&self) -> TraceResult<()>;
Expand Down Expand Up @@ -163,6 +164,7 @@
}
}

use crate::export::trace::ExportResult;
/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
/// in batches to the configured `SpanExporter`. This processor is ideal for
/// high-throughput environments, as it minimizes the overhead of exporting spans
Expand Down Expand Up @@ -217,16 +219,17 @@
/// provider.shutdown();
/// }
/// ```
use futures_executor::block_on;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::SyncSender;

/// Messages exchanged between the main thread and the background thread.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
ExportSpan(SpanData),
//ExportSpan(SpanData),
ExportSpan(Arc<AtomicBool>),
ForceFlush(SyncSender<TraceResult<()>>),
Shutdown(SyncSender<TraceResult<()>>),
SetResource(Arc<Resource>),
Expand All @@ -235,12 +238,17 @@
/// A batch span processor with a dedicated background thread.
#[derive(Debug)]
pub struct BatchSpanProcessor {
message_sender: SyncSender<BatchMessage>,
span_sender: SyncSender<SpanData>, // Data channel to store spans
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
dropped_span_count: Arc<AtomicUsize>,
export_span_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,
max_queue_size: usize,
}

impl BatchSpanProcessor {
Expand All @@ -255,7 +263,12 @@
where
E: SpanExporter + Send + 'static,
{
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);
let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();

let handle = thread::Builder::new()
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
Expand All @@ -268,7 +281,7 @@
);
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();

let current_batch_size = current_batch_size_for_thread;
loop {
let remaining_time_option = config
.scheduled_delay
Expand All @@ -279,44 +292,71 @@
};
match message_receiver.recv_timeout(remaining_time) {
Ok(message) => match message {
BatchMessage::ExportSpan(span) => {
spans.push(span);
if spans.len() >= config.max_queue_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
{
otel_error!(
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)
);
}
last_export_time = Instant::now();
}
BatchMessage::ExportSpan(export_span_message_sent) => {
// Reset the export span message sent flag now it has has been processed.
export_span_message_sent.store(false, Ordering::Relaxed);
otel_debug!(
name: "BatchSpanProcessor.ExportingDueToBatchSize",
);
let _ = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
}
BatchMessage::ForceFlush(sender) => {
let result = block_on(exporter.export(spans.split_off(0)));
otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
let result = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);
}
BatchMessage::Shutdown(sender) => {
let result = block_on(exporter.export(spans.split_off(0)));
otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
let result = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);

otel_debug!(
name: "BatchSpanProcessor.ThreadExiting",
reason = "ShutdownRequested"
);
//
// break out the loop and return from the current background thread.
//
break;
}
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
},
Err(RecvTimeoutError::Timeout) => {
if last_export_time.elapsed() >= config.scheduled_delay {
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
otel_error!(
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)
);
}
last_export_time = Instant::now();
}
otel_debug!(
name: "BatchSpanProcessor.ExportingDueToTimer",
);

let _ = Self::get_spans_and_export(
&span_receiver,
&mut exporter,
&mut spans,
&mut last_export_time,
&current_batch_size,
&config,
);
}
Err(RecvTimeoutError::Disconnected) => {
// Channel disconnected, only thing to do is break
Expand All @@ -336,12 +376,17 @@
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure

Self {
span_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_span_count: Arc::new(AtomicUsize::new(0)),
max_queue_size,
export_span_message_sent: Arc::new(AtomicBool::new(false)),
current_batch_size,
max_export_batch_size,
}
}

Expand All @@ -355,6 +400,72 @@
config: BatchConfig::default(),
}
}

// This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
// It returns the result of the export operation.
// It expects the span vec to be empty when it's called.
#[inline]
fn get_spans_and_export<E>(
spans_receiver: &Receiver<SpanData>,
exporter: &mut E,
spans: &mut Vec<SpanData>,
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
config: &BatchConfig,
) -> ExportResult
where
E: SpanExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec
while let Ok(span) = spans_receiver.try_recv() {
spans.push(span);
if spans.len() == config.max_export_batch_size {
break;
}
}

let count_of_spans = spans.len(); // Count of spans that will be exported
let result = Self::export_with_timeout_sync(
config.max_export_timeout,
exporter,
spans,
last_export_time,
); // This method clears the spans vec after exporting

current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
result
}

#[allow(clippy::vec_box)]
fn export_with_timeout_sync<E>(
_: Duration, // TODO, enforcing timeout in exporter.
exporter: &mut E,
batch: &mut Vec<SpanData>,
last_export_time: &mut Instant,
) -> ExportResult
where
E: SpanExporter + Send + Sync + 'static,
{
*last_export_time = Instant::now();

if batch.is_empty() {
return TraceResult::Ok(());
}

let export = exporter.export(batch.split_off(0));
let export_result = futures_executor::block_on(export);

match export_result {
Ok(_) => TraceResult::Ok(()),
Err(err) => {
otel_error!(

Check warning on line 461 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L460-L461

Added lines #L460 - L461 were not covered by tests
name: "BatchSpanProcessor.ExportError",
error = format!("{}", err)

Check warning on line 463 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L463

Added line #L463 was not covered by tests
);
TraceResult::Err(err)

Check warning on line 465 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L465

Added line #L465 was not covered by tests
}
}
}
}

impl SpanProcessor for BatchSpanProcessor {
Expand All @@ -369,10 +480,11 @@
// this is a warning, as the user is trying to emit after the processor has been shutdown
otel_warn!(
name: "BatchSpanProcessor.Emit.ProcessorShutdown",
message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."

Check warning on line 483 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L483

Added line #L483 was not covered by tests
);
return;
}
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
let result = self.span_sender.try_send(span);

if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
Expand All @@ -382,6 +494,36 @@
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
// At this point, sending the span to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
{
// Check if the a control message for exporting spans is already sent to the worker thread.
// If not, send a control message to export spans.
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_span_message_sent.load(Ordering::Relaxed) {
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportSpan(
self.export_span_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_span_message_sent` flag.
self.export_span_message_sent
.store(false, Ordering::Relaxed);
}

Check warning on line 522 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L517-L522

Added lines #L517 - L522 were not covered by tests
}
}

Check warning on line 524 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L524

Added line #L524 was not covered by tests
}
}
}

/// Flushes all pending spans.
Expand All @@ -401,17 +543,20 @@

/// Shuts down the processor.
fn shutdown(&self) -> TraceResult<()> {
if self.is_shutdown.swap(true, Ordering::Relaxed) {
return Err(TraceError::Other("Processor already shutdown".into()));
}
let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
otel_warn!(
name: "BatchSpanProcessor.LogsDropped",
name: "BatchSpanProcessor.SpansDropped",
dropped_span_count = dropped_spans,
max_queue_size = max_queue_size,

Check warning on line 555 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L555

Added line #L555 was not covered by tests
message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}
if self.is_shutdown.swap(true, Ordering::Relaxed) {
return Err(TraceError::Other("Processor already shutdown".into()));
}

let (sender, receiver) = sync_channel(1);
self.message_sender
.try_send(BatchMessage::Shutdown(sender))
Expand Down
Loading