Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 172 additions & 3 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ pub struct BatchSpanProcessor {
max_export_batch_size: usize,
dropped_spans_count: AtomicUsize,
max_queue_size: usize,
export_unsampled: bool,
}

impl BatchSpanProcessor {
Expand All @@ -310,6 +311,7 @@ impl BatchSpanProcessor {
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let export_unsampled = config.export_unsampled;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();

Expand Down Expand Up @@ -430,6 +432,7 @@ impl BatchSpanProcessor {
export_span_message_sent: Arc::new(AtomicBool::new(false)),
current_batch_size,
max_export_batch_size,
export_unsampled,
}
}

Expand Down Expand Up @@ -527,6 +530,11 @@ impl SpanProcessor for BatchSpanProcessor {

/// Handles span end.
fn on_end(&self, span: SpanData) {
// Export spans if they are sampled OR if export_unsampled is enabled
if !span.span_context.is_sampled() && !self.export_unsampled {
return;
}

let result = self.span_sender.try_send(span);

// match for result and handle each separately
Expand Down Expand Up @@ -729,7 +737,7 @@ where

/// Batch span processor configuration.
/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BatchConfig {
/// The maximum queue size to buffer spans for delayed processing. If the
/// queue gets full it drops the spans. The default value of is 2048.
Expand Down Expand Up @@ -757,6 +765,11 @@ pub struct BatchConfig {
/// by an exporter. A value of 1 will cause exports to be performed
/// synchronously on the BatchSpanProcessor task.
pub(crate) max_concurrent_exports: usize,

/// Whether to export unsampled spans that are recording.
/// If true, spans with is_recording() == true but TraceFlags::SAMPLED == false
/// will be exported. Defaults to false for backward compatibility.
pub(crate) export_unsampled: bool,
}

impl Default for BatchConfig {
Expand All @@ -773,6 +786,7 @@ pub struct BatchConfigBuilder {
max_export_batch_size: usize,
max_export_timeout: Duration,
max_concurrent_exports: usize,
export_unsampled: bool,
}

impl Default for BatchConfigBuilder {
Expand All @@ -793,6 +807,7 @@ impl Default for BatchConfigBuilder {
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
export_unsampled: false,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also make this flag configurable via an environment variable, consistent with other BSP knobs? something like OTEL_BSP_EXPORT_UNSAMPLED

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTel spec has general put a halt of adding new env variables, as OTel is moving towards new configuration model. I am not sure if we should add another env variable now..

}
.init_from_env_vars()
}
Expand Down Expand Up @@ -843,6 +858,15 @@ impl BatchConfigBuilder {
self
}

/// Set export_unsampled for [`BatchConfigBuilder`].
/// When enabled, allows exporting spans with `is_recording() == true` but `TraceFlags::SAMPLED == false`.
/// This is useful for span-to-metrics pipelines and feeding tail-samplers with complete input.
/// The default value is false for backward compatibility.
pub fn with_export_unsampled(mut self, export_unsampled: bool) -> Self {
self.export_unsampled = export_unsampled;
self
}

/// Set scheduled_delay_duration for [`BatchConfigBuilder`].
/// It's the delay interval in milliseconds between two consecutive processing of batches.
/// The default value is 5000 milliseconds.
Expand Down Expand Up @@ -881,6 +905,7 @@ impl BatchConfigBuilder {
max_export_timeout: self.max_export_timeout,
max_concurrent_exports: self.max_concurrent_exports,
max_export_batch_size,
export_unsampled: self.export_unsampled,
}
}

Expand Down Expand Up @@ -947,7 +972,7 @@ mod tests {
use crate::trace::InMemorySpanExporterBuilder;
use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
use crate::trace::{SpanData, SpanExporter};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState};
use std::fmt::Debug;
use std::time::Duration;

Expand Down Expand Up @@ -1127,7 +1152,13 @@ mod tests {
// Helper function to create a default test span
fn create_test_span(name: &str) -> SpanData {
SpanData {
span_context: SpanContext::empty_context(),
span_context: SpanContext::new(
TraceId::from(1),
SpanId::from(1),
TraceFlags::SAMPLED,
false,
TraceState::default(),
),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
name: name.to_string().into(),
Expand Down Expand Up @@ -1442,4 +1473,142 @@ mod tests {
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 10);
}

#[test]
fn batchspanprocessor_export_unsampled_disabled_by_default() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let processor = BatchSpanProcessor::new(exporter, BatchConfig::default());

let unsampled_span = SpanData {
span_context: SpanContext::new(
TraceId::from(1),
SpanId::from(1),
TraceFlags::NOT_SAMPLED,
false,
TraceState::default(),
),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
name: "unsampled_span".into(),
start_time: opentelemetry::time::now(),
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: Default::default(),
};

processor.on_end(unsampled_span);
processor.force_flush().unwrap();

let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 0, "Unsampled spans should not be exported by default");
}

#[test]
fn batchspanprocessor_export_unsampled_enabled() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default()
.with_export_unsampled(true)
.build();
let processor = BatchSpanProcessor::new(exporter, config);

let unsampled_span = SpanData {
span_context: SpanContext::new(
TraceId::from(1),
SpanId::from(1),
TraceFlags::NOT_SAMPLED,
false,
TraceState::default(),
),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
name: "unsampled_span".into(),
start_time: opentelemetry::time::now(),
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: Default::default(),
};

processor.on_end(unsampled_span);
processor.force_flush().unwrap();

let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 1, "Unsampled spans should be exported when export_unsampled is enabled");
assert_eq!(exported_spans[0].name, "unsampled_span");
assert!(!exported_spans[0].span_context.is_sampled());
}

#[test]
fn batchspanprocessor_mixed_sampled_and_unsampled() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default()
.with_export_unsampled(true)
.build();
let processor = BatchSpanProcessor::new(exporter, config);

// Add a sampled span
let sampled_span = SpanData {
span_context: SpanContext::new(
TraceId::from(1),
SpanId::from(1),
TraceFlags::SAMPLED,
false,
TraceState::default(),
),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
name: "sampled_span".into(),
start_time: opentelemetry::time::now(),
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: Default::default(),
};

// Add an unsampled span
let unsampled_span = SpanData {
span_context: SpanContext::new(
TraceId::from(2),
SpanId::from(2),
TraceFlags::NOT_SAMPLED,
false,
TraceState::default(),
),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
name: "unsampled_span".into(),
start_time: opentelemetry::time::now(),
end_time: opentelemetry::time::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: Default::default(),
};

processor.on_end(sampled_span);
processor.on_end(unsampled_span);
processor.force_flush().unwrap();

let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 2, "Both sampled and unsampled spans should be exported when export_unsampled is enabled");

let span_names: Vec<&str> = exported_spans.iter().map(|s| s.name.as_ref()).collect();
assert!(span_names.contains(&"sampled_span"));
assert!(span_names.contains(&"unsampled_span"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub struct BatchSpanProcessor<R: RuntimeChannel> {

// Track the maximum queue size that was configured for this processor
max_queue_size: usize,

// Whether to export unsampled spans that are recording
export_unsampled: bool,
}

impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
Expand All @@ -106,7 +109,8 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
}

fn on_end(&self, span: SpanData) {
if !span.span_context.is_sampled() {
// Check if span should be exported based on sampling and config
if !span.span_context.is_sampled() && !self.export_unsampled {
return;
}

Expand Down Expand Up @@ -374,13 +378,14 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
runtime.batch_message_channel(config.max_queue_size);

let max_queue_size = config.max_queue_size;
let config_for_worker = config.clone();

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config_for_worker.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = inner_runtime.clone();
Expand All @@ -390,7 +395,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
spans: Vec::new(),
export_tasks: FuturesUnordered::new(),
runtime: timeout_runtime,
config,
config: config_for_worker,
exporter: Arc::new(RwLock::new(exporter)),
};

Expand All @@ -402,6 +407,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
message_sender,
dropped_spans_count: AtomicUsize::new(0),
max_queue_size,
export_unsampled: config.export_unsampled,
}
}

Expand Down Expand Up @@ -647,6 +653,7 @@ mod tests {
scheduled_delay: Duration::from_secs(3600), // effectively disabled
max_export_timeout: Duration::from_secs(5),
max_concurrent_exports: 2, // what we want to verify
export_unsampled: false,
};

// Spawn the processor.
Expand Down Expand Up @@ -685,6 +692,7 @@ mod tests {
scheduled_delay: Duration::from_secs(3600),
max_export_timeout: Duration::from_secs(5),
max_concurrent_exports: 1, // what we want to verify
export_unsampled: false,
};

let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
Expand Down
Loading