diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 0ebe80f885..7c69151d1d 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -292,6 +292,7 @@ pub struct BatchSpanProcessor { max_export_batch_size: usize, dropped_spans_count: AtomicUsize, max_queue_size: usize, + export_unsampled: bool, } impl BatchSpanProcessor { @@ -310,6 +311,7 @@ impl BatchSpanProcessor { let (message_sender, message_receiver) = sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; let max_export_batch_size = config.max_export_batch_size; + let export_unsampled = config.export_unsampled; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); @@ -430,6 +432,7 @@ impl BatchSpanProcessor { export_span_message_sent: Arc::new(AtomicBool::new(false)), current_batch_size, max_export_batch_size, + export_unsampled, } } @@ -527,6 +530,11 @@ impl SpanProcessor for BatchSpanProcessor { /// Handles span end. fn on_end(&self, span: SpanData) { + // Export spans if they are sampled OR if export_unsampled is enabled + if !span.span_context.is_sampled() && !self.export_unsampled { + return; + } + let result = self.span_sender.try_send(span); // match for result and handle each separately @@ -729,7 +737,7 @@ where /// Batch span processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BatchConfig { /// The maximum queue size to buffer spans for delayed processing. If the /// queue gets full it drops the spans. The default value of is 2048. @@ -757,6 +765,11 @@ pub struct BatchConfig { /// by an exporter. A value of 1 will cause exports to be performed /// synchronously on the BatchSpanProcessor task. pub(crate) max_concurrent_exports: usize, + + /// Whether to export unsampled spans that are recording. + /// If true, spans with is_recording() == true but TraceFlags::SAMPLED == false + /// will be exported. Defaults to false for backward compatibility. + pub(crate) export_unsampled: bool, } impl Default for BatchConfig { @@ -773,6 +786,7 @@ pub struct BatchConfigBuilder { max_export_batch_size: usize, max_export_timeout: Duration, max_concurrent_exports: usize, + export_unsampled: bool, } impl Default for BatchConfigBuilder { @@ -793,6 +807,7 @@ impl Default for BatchConfigBuilder { max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, + export_unsampled: false, } .init_from_env_vars() } @@ -843,6 +858,15 @@ impl BatchConfigBuilder { self } + /// Set export_unsampled for [`BatchConfigBuilder`]. + /// When enabled, allows exporting spans with `is_recording() == true` but `TraceFlags::SAMPLED == false`. + /// This is useful for span-to-metrics pipelines and feeding tail-samplers with complete input. + /// The default value is false for backward compatibility. + pub fn with_export_unsampled(mut self, export_unsampled: bool) -> Self { + self.export_unsampled = export_unsampled; + self + } + /// Set scheduled_delay_duration for [`BatchConfigBuilder`]. /// It's the delay interval in milliseconds between two consecutive processing of batches. /// The default value is 5000 milliseconds. @@ -881,6 +905,7 @@ impl BatchConfigBuilder { max_export_timeout: self.max_export_timeout, max_concurrent_exports: self.max_concurrent_exports, max_export_batch_size, + export_unsampled: self.export_unsampled, } } @@ -947,7 +972,9 @@ mod tests { use crate::trace::InMemorySpanExporterBuilder; use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; use crate::trace::{SpanData, SpanExporter}; - use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; + use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, + }; use std::fmt::Debug; use std::time::Duration; @@ -1128,7 +1155,13 @@ mod tests { // Helper function to create a default test span fn create_test_span(name: &str) -> SpanData { SpanData { - span_context: SpanContext::empty_context(), + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::SAMPLED, + false, + TraceState::default(), + ), parent_span_id: SpanId::INVALID, parent_span_is_remote: false, span_kind: SpanKind::Internal, @@ -1445,4 +1478,154 @@ mod tests { let exported_spans = exporter_shared.lock().unwrap(); assert_eq!(exported_spans.len(), 10); } + + #[test] + fn batchspanprocessor_export_unsampled_disabled_by_default() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let processor = BatchSpanProcessor::new(exporter, BatchConfig::default()); + + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor.on_end(unsampled_span); + processor.force_flush().unwrap(); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 0, + "Unsampled spans should not be exported by default" + ); + } + + #[test] + fn batchspanprocessor_export_unsampled_enabled() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor.on_end(unsampled_span); + processor.force_flush().unwrap(); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 1, + "Unsampled spans should be exported when export_unsampled is enabled" + ); + assert_eq!(exported_spans[0].name, "unsampled_span"); + assert!(!exported_spans[0].span_context.is_sampled()); + } + + #[test] + fn batchspanprocessor_mixed_sampled_and_unsampled() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Add a sampled span + let sampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "sampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + // Add an unsampled span + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(2), + SpanId::from(2), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor.on_end(sampled_span); + processor.on_end(unsampled_span); + processor.force_flush().unwrap(); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 2, + "Both sampled and unsampled spans should be exported when export_unsampled is enabled" + ); + + let span_names: Vec<&str> = exported_spans.iter().map(|s| s.name.as_ref()).collect(); + assert!(span_names.contains(&"sampled_span")); + assert!(span_names.contains(&"unsampled_span")); + } } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..fe28ab8fbe 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -90,6 +90,9 @@ pub struct BatchSpanProcessor { // Track the maximum queue size that was configured for this processor max_queue_size: usize, + + // Whether to export unsampled spans that are recording + export_unsampled: bool, } impl fmt::Debug for BatchSpanProcessor { @@ -106,7 +109,8 @@ impl SpanProcessor for BatchSpanProcessor { } fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + // Check if span should be exported based on sampling and config + if !span.span_context.is_sampled() && !self.export_unsampled { return; } @@ -374,15 +378,17 @@ impl BatchSpanProcessor { runtime.batch_message_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; + let config_for_worker = config.clone(); let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); + let ticker = + to_interval_stream(inner_runtime.clone(), config_for_worker.scheduled_delay) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| BatchMessage::Flush(None)); let timeout_runtime = inner_runtime.clone(); let messages = Box::pin(stream::select(message_receiver, ticker)); @@ -390,7 +396,7 @@ impl BatchSpanProcessor { spans: Vec::new(), export_tasks: FuturesUnordered::new(), runtime: timeout_runtime, - config, + config: config_for_worker, exporter: Arc::new(RwLock::new(exporter)), }; @@ -402,6 +408,7 @@ impl BatchSpanProcessor { message_sender, dropped_spans_count: AtomicUsize::new(0), max_queue_size, + export_unsampled: config.export_unsampled, } } @@ -647,6 +654,7 @@ mod tests { scheduled_delay: Duration::from_secs(3600), // effectively disabled max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 2, // what we want to verify + export_unsampled: false, }; // Spawn the processor. @@ -685,6 +693,7 @@ mod tests { scheduled_delay: Duration::from_secs(3600), max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 1, // what we want to verify + export_unsampled: false, }; let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);