Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use span::Span;
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
BatchSpanProcessorDedicatedThread, BatchSpanProcessorDedicatedThreadBuilder,
SimpleSpanProcessor, SpanProcessor,
};
pub use tracer::Tracer;
Expand Down
199 changes: 199 additions & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
use std::sync::{Arc, Mutex};
use std::{env, fmt, str::FromStr, time::Duration};

use std::sync::atomic::AtomicBool;
use std::thread;
use std::time::Instant;

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
/// Default delay interval between two consecutive exports.
Expand Down Expand Up @@ -166,6 +170,201 @@
}
}

use futures_executor::block_on;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::SyncSender;

/// Messages exchanged between the main thread and the background thread.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessageDedicatedThread {
ExportSpan(SpanData),
ForceFlush(SyncSender<TraceResult<()>>),
Shutdown(SyncSender<TraceResult<()>>),
}

/// A batch span processor with a dedicated background thread.
#[derive(Debug)]
pub struct BatchSpanProcessorDedicatedThread {
message_sender: SyncSender<BatchMessageDedicatedThread>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
dropped_span_count: Arc<AtomicBool>,
}

impl BatchSpanProcessorDedicatedThread {
/// Creates a new instance of `BatchSpanProcessorDedicatedThread`.
pub fn new<E>(
mut exporter: E,
max_queue_size: usize,
scheduled_delay: Duration,
shutdown_timeout: Duration,
) -> Self
where
E: SpanExporter + Send + 'static,
{
let (message_sender, message_receiver) = sync_channel(max_queue_size);

let handle = thread::Builder::new()
.name("BatchSpanProcessorThread".to_string())
.spawn(move || {
let mut spans = Vec::new();
let mut last_export_time = Instant::now();

Check warning on line 214 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L199-L214

Added lines #L199 - L214 were not covered by tests

loop {
let timeout = scheduled_delay.saturating_sub(last_export_time.elapsed());
match message_receiver.recv_timeout(timeout) {
Ok(message) => match message {
BatchMessageDedicatedThread::ExportSpan(span) => {
spans.push(span);
if spans.len() >= max_queue_size
|| last_export_time.elapsed() >= scheduled_delay

Check warning on line 223 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L217-L223

Added lines #L217 - L223 were not covered by tests
{
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
{
eprintln!("Export error: {:?}", err);
}
last_export_time = Instant::now();
}

Check warning on line 230 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L225-L230

Added lines #L225 - L230 were not covered by tests
}
BatchMessageDedicatedThread::ForceFlush(sender) => {
let result = block_on(exporter.export(spans.split_off(0)));
let _ = sender.send(result);
}
BatchMessageDedicatedThread::Shutdown(sender) => {
let result = block_on(exporter.export(spans.split_off(0)));
let _ = sender.send(result);
break;

Check warning on line 239 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L232-L239

Added lines #L232 - L239 were not covered by tests
}
},
Err(RecvTimeoutError::Timeout) => {
if last_export_time.elapsed() >= scheduled_delay {
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
eprintln!("Export error: {:?}", err);
}
last_export_time = Instant::now();
}

Check warning on line 248 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L243-L248

Added lines #L243 - L248 were not covered by tests
}
Err(RecvTimeoutError::Disconnected) => {
eprintln!("Channel disconnected, shutting down processor thread.");
break;

Check warning on line 252 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L251-L252

Added lines #L251 - L252 were not covered by tests
}
}
}
})
.expect("Failed to spawn thread");

Self {
message_sender,
handle: Mutex::new(Some(handle)),
shutdown_timeout,
is_shutdown: AtomicBool::new(false),
dropped_span_count: Arc::new(AtomicBool::new(false)),
}
}

Check warning on line 266 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L256-L266

Added lines #L256 - L266 were not covered by tests

/// Handles span end.
pub fn on_end(&self, span: SpanData) {
if self.is_shutdown.load(Ordering::Relaxed) {
eprintln!("Processor is shutdown. Dropping span.");
return;
}
if self
.message_sender
.try_send(BatchMessageDedicatedThread::ExportSpan(span))
.is_err() && !self.dropped_span_count.load(Ordering::Relaxed) {
eprintln!("Queue is full, dropping spans.");
self.dropped_span_count.store(true, Ordering::Relaxed);
}
}

Check warning on line 281 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L269-L281

Added lines #L269 - L281 were not covered by tests

/// Flushes all pending spans.
pub fn force_flush(&self) -> TraceResult<()> {
if self.is_shutdown.load(Ordering::Relaxed) {
return Err(TraceError::Other("Processor already shutdown".into()));
}
let (sender, receiver) = sync_channel(1);
self.message_sender
.try_send(BatchMessageDedicatedThread::ForceFlush(sender))
.map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?;

Check warning on line 291 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L284-L291

Added lines #L284 - L291 were not covered by tests

receiver
.recv_timeout(self.shutdown_timeout)
.map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?
}

Check warning on line 296 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L293-L296

Added lines #L293 - L296 were not covered by tests

/// Shuts down the processor.
pub fn shutdown(&self) -> TraceResult<()> {
if self.is_shutdown.swap(true, Ordering::Relaxed) {
return Err(TraceError::Other("Processor already shutdown".into()));
}
let (sender, receiver) = sync_channel(1);
self.message_sender
.try_send(BatchMessageDedicatedThread::Shutdown(sender))
.map_err(|_| TraceError::Other("Failed to send Shutdown message".into()))?;

Check warning on line 306 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L299-L306

Added lines #L299 - L306 were not covered by tests

let result = receiver
.recv_timeout(self.shutdown_timeout)
.map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?;
if let Some(handle) = self.handle.lock().unwrap().take() {
handle.join().expect("Failed to join thread");
}
result
}

Check warning on line 315 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L308-L315

Added lines #L308 - L315 were not covered by tests
}

/// Builder for `BatchSpanProcessorDedicatedThread`.
#[derive(Debug, Default)]
pub struct BatchSpanProcessorDedicatedThreadBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
shutdown_timeout: Duration,
}

impl BatchSpanProcessorDedicatedThreadBuilder {
/// Creates a new builder with default values.
pub fn new() -> Self {
Self {
max_queue_size: 2048,
scheduled_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(5),
}
}

Check warning on line 334 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L328-L334

Added lines #L328 - L334 were not covered by tests

/// Sets the maximum queue size for spans.
pub fn with_max_queue_size(mut self, size: usize) -> Self {
self.max_queue_size = size;
self
}

Check warning on line 340 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L337-L340

Added lines #L337 - L340 were not covered by tests

/// Sets the delay between exports.
pub fn with_scheduled_delay(mut self, delay: Duration) -> Self {
self.scheduled_delay = delay;
self
}

Check warning on line 346 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L343-L346

Added lines #L343 - L346 were not covered by tests

/// Sets the timeout for shutdown and flush operations.
pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = timeout;
self
}

Check warning on line 352 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L349-L352

Added lines #L349 - L352 were not covered by tests

/// Builds the `BatchSpanProcessorDedicatedThread` instance.
pub fn build<E>(self, exporter: E) -> BatchSpanProcessorDedicatedThread
where
E: SpanExporter + Send + 'static,
{
BatchSpanProcessorDedicatedThread::new(
exporter,
self.max_queue_size,
self.scheduled_delay,
self.shutdown_timeout,
)
}

Check warning on line 365 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L355-L365

Added lines #L355 - L365 were not covered by tests
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down
Loading