3737use crate :: export:: trace:: { SpanData , SpanExporter } ;
3838use crate :: resource:: Resource ;
3939use crate :: trace:: Span ;
40+ use opentelemetry:: otel_error;
4041use opentelemetry:: { otel_debug, otel_warn} ;
4142use opentelemetry:: {
4243 trace:: { TraceError , TraceResult } ,
@@ -162,6 +163,60 @@ impl SpanProcessor for SimpleSpanProcessor {
162163 }
163164}
164165
166+ /// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
167+ /// in batches to the configured `SpanExporter`. This processor is ideal for
168+ /// high-throughput environments, as it minimizes the overhead of exporting spans
169+ /// individually. It uses a **dedicated background thread** to manage and export spans
170+ /// asynchronously, ensuring that the application's main execution flow is not blocked.
171+ ///
172+ /// /// # Example
173+ ///
174+ /// This example demonstrates how to configure and use the `BatchSpanProcessor`
175+ /// with a custom configuration. Note that a dedicated thread is used internally
176+ /// to manage the export process.
177+ ///
178+ /// ```rust
179+ /// use opentelemetry::global;
180+ /// use opentelemetry_sdk::{
181+ /// trace::{BatchSpanProcessor, BatchConfigBuilder, TracerProvider},
182+ /// runtime,
183+ /// testing::trace::NoopSpanExporter,
184+ /// };
185+ /// use opentelemetry::trace::Tracer as _;
186+ /// use opentelemetry::trace::Span;
187+ /// use std::time::Duration;
188+ ///
189+ /// fn main() {
190+ /// // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration).
191+ /// let exporter = NoopSpanExporter::new();
192+ ///
193+ /// // Step 2: Configure the BatchSpanProcessor.
194+ /// let batch_processor = BatchSpanProcessor::builder(exporter)
195+ /// .with_batch_config(
196+ /// BatchConfigBuilder::default()
197+ /// .with_max_queue_size(1024) // Buffer up to 1024 spans.
198+ /// .with_max_export_batch_size(256) // Export in batches of up to 256 spans.
199+ /// .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds.
200+ /// .with_max_export_timeout(Duration::from_secs(10)) // Timeout after 10 seconds.
201+ /// .build(),
202+ /// )
203+ /// .build();
204+ ///
205+ /// // Step 3: Set up a TracerProvider with the configured processor.
206+ /// let provider = TracerProvider::builder()
207+ /// .with_span_processor(batch_processor)
208+ /// .build();
209+ /// global::set_tracer_provider(provider.clone());
210+ ///
211+ /// // Step 4: Create spans and record operations.
212+ /// let tracer = global::tracer("example-tracer");
213+ /// let mut span = tracer.start("example-span");
214+ /// span.end(); // Mark the span as completed.
215+ ///
216+ /// // Step 5: Ensure all spans are flushed before exiting.
217+ /// provider.shutdown();
218+ /// }
219+ /// ```
165220use futures_executor:: block_on;
166221use std:: sync:: mpsc:: sync_channel;
167222use std:: sync:: mpsc:: RecvTimeoutError ;
@@ -220,7 +275,10 @@ impl BatchSpanProcessor {
220275 {
221276 if let Err ( err) = block_on ( exporter. export ( spans. split_off ( 0 ) ) )
222277 {
223- eprintln ! ( "Export error: {:?}" , err) ;
278+ otel_error ! (
279+ name: "BatchSpanProcessor.ExportError" ,
280+ error = format!( "{}" , err)
281+ ) ;
224282 }
225283 last_export_time = Instant :: now ( ) ;
226284 }
@@ -238,19 +296,25 @@ impl BatchSpanProcessor {
238296 Err ( RecvTimeoutError :: Timeout ) => {
239297 if last_export_time. elapsed ( ) >= config. scheduled_delay {
240298 if let Err ( err) = block_on ( exporter. export ( spans. split_off ( 0 ) ) ) {
241- eprintln ! ( "Export error: {:?}" , err) ;
299+ otel_error ! (
300+ name: "BatchSpanProcessor.ExportError" ,
301+ error = format!( "{}" , err)
302+ ) ;
242303 }
243304 last_export_time = Instant :: now ( ) ;
244305 }
245306 }
246307 Err ( RecvTimeoutError :: Disconnected ) => {
247- eprintln ! ( "Channel disconnected, shutting down processor thread." ) ;
308+ otel_error ! (
309+ name: "BatchLogProcessor.InternalError.ChannelDisconnected" ,
310+ message = "Channel disconnected, shutting down processor thread."
311+ ) ;
248312 break ;
249313 }
250314 }
251315 }
252316 } )
253- . expect ( "Failed to spawn thread" ) ;
317+ . expect ( "Failed to spawn thread" ) ; //TODO: Handle thread spawn failure
254318
255319 Self {
256320 message_sender,
@@ -720,7 +784,7 @@ mod tests {
720784
721785 use futures_util:: future:: BoxFuture ;
722786 use futures_util:: FutureExt ;
723- use std:: sync:: { Arc , Mutex } ;
787+ use std:: sync:: { atomic :: Ordering , Arc , Mutex } ;
724788
725789 // Mock exporter to test functionality
726790 #[ derive( Debug ) ]
@@ -838,4 +902,49 @@ mod tests {
838902 "Shutdown should fail when called a second time"
839903 ) ;
840904 }
905+
906+ #[ test]
907+ fn batchspanprocessor_handles_dropped_spans ( ) {
908+ let exporter = MockSpanExporter :: new ( ) ;
909+ let exporter_shared = exporter. exported_spans . clone ( ) ; // Shared access to verify exported spans
910+ let config = BatchConfigBuilder :: default ( )
911+ . with_max_queue_size ( 2 ) // Small queue size to test span dropping
912+ . with_scheduled_delay ( Duration :: from_secs ( 5 ) )
913+ . with_max_export_timeout ( Duration :: from_secs ( 2 ) )
914+ . build ( ) ;
915+ let processor = BatchSpanProcessor :: new ( exporter, config) ;
916+
917+ // Create test spans and send them to the processor
918+ let span1 = create_test_span ( "span1" ) ;
919+ let span2 = create_test_span ( "span2" ) ;
920+ let span3 = create_test_span ( "span3" ) ; // This span should be dropped
921+
922+ processor. on_end ( span1. clone ( ) ) ;
923+ processor. on_end ( span2. clone ( ) ) ;
924+ processor. on_end ( span3. clone ( ) ) ; // This span exceeds the queue size
925+
926+ // Wait for the scheduled delay to expire
927+ std:: thread:: sleep ( Duration :: from_secs ( 3 ) ) ;
928+
929+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
930+
931+ // Verify that only the first two spans are exported
932+ assert_eq ! (
933+ exported_spans. len( ) ,
934+ 2 ,
935+ "Unexpected number of exported spans"
936+ ) ;
937+ assert ! ( exported_spans. iter( ) . any( |s| s. name == "span1" ) ) ;
938+ assert ! ( exported_spans. iter( ) . any( |s| s. name == "span2" ) ) ;
939+
940+ // Ensure the third span is dropped
941+ assert ! (
942+ !exported_spans. iter( ) . any( |s| s. name == "span3" ) ,
943+ "Span3 should have been dropped"
944+ ) ;
945+
946+ // Verify dropped spans count (if accessible in your implementation)
947+ let dropped_count = processor. dropped_span_count . load ( Ordering :: Relaxed ) ;
948+ assert_eq ! ( dropped_count, 1 , "Unexpected number of dropped spans" ) ;
949+ }
841950}
0 commit comments