@@ -248,7 +248,7 @@ impl BatchSpanProcessor {
248248 pub fn new < E > (
249249 mut exporter : E ,
250250 config : BatchConfig ,
251- //ax_queue_size : usize,
251+ //max_queue_size : usize,
252252 //scheduled_delay: Duration,
253253 //shutdown_timeout: Duration,
254254 ) -> Self
@@ -258,16 +258,21 @@ impl BatchSpanProcessor {
258258 let ( message_sender, message_receiver) = sync_channel ( config. max_queue_size ) ;
259259
260260 let handle = thread:: Builder :: new ( )
261- . name ( "BatchSpanProcessorDedicatedThread " . to_string ( ) )
261+ . name ( "BatchSpanProcessorThread " . to_string ( ) )
262262 . spawn ( move || {
263263 let mut spans = Vec :: new ( ) ;
264+ spans. reserve ( config. max_export_batch_size ) ;
264265 let mut last_export_time = Instant :: now ( ) ;
265266
266267 loop {
267- let timeout = config
268+ let remaining_time_option = config
268269 . scheduled_delay
269- . saturating_sub ( last_export_time. elapsed ( ) ) ;
270- match message_receiver. recv_timeout ( timeout) {
270+ . checked_sub ( last_export_time. elapsed ( ) ) ;
271+ let remaining_time = match remaining_time_option {
272+ Some ( remaining_time) => remaining_time,
273+ None => config. scheduled_delay ,
274+ } ;
275+ match message_receiver. recv_timeout ( remaining_time) {
271276 Ok ( message) => match message {
272277 BatchMessage :: ExportSpan ( span) => {
273278 spans. push ( span) ;
@@ -351,14 +356,17 @@ impl SpanProcessor for BatchSpanProcessor {
351356 /// Handles span end.
352357 fn on_end ( & self , span : SpanData ) {
353358 if self . is_shutdown . load ( Ordering :: Relaxed ) {
354- eprintln ! ( "Processor is shutdown. Dropping span." ) ;
359+ // this is a warning, as the user is trying to emit after the processor has been shutdown
360+ otel_warn ! (
361+ name: "BatchSpanProcessor.Emit.ProcessorShutdown" ,
362+ ) ;
355363 return ;
356364 }
357365 let result = self . message_sender . try_send ( BatchMessage :: ExportSpan ( span) ) ;
358366
359367 // TODO - Implement throttling to prevent error flooding when the queue is full or closed.
360368 if result. is_err ( ) {
361- // Increment dropped logs count. The first time we have to drop a log ,
369+ // Increment dropped span count. The first time we have to drop a span ,
362370 // emit a warning.
363371 if self . dropped_span_count . fetch_add ( 1 , Ordering :: Relaxed ) == 0 {
364372 otel_warn ! ( name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted" ,
@@ -384,6 +392,14 @@ impl SpanProcessor for BatchSpanProcessor {
384392
385393 /// Shuts down the processor.
386394 fn shutdown ( & self ) -> TraceResult < ( ) > {
395+ let dropped_spans = self . dropped_span_count . load ( Ordering :: Relaxed ) ;
396+ if dropped_spans > 0 {
397+ otel_warn ! (
398+ name: "BatchSpanProcessor.LogsDropped" ,
399+ dropped_span_count = dropped_spans,
400+ message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
401+ ) ;
402+ }
387403 if self . is_shutdown . swap ( true , Ordering :: Relaxed ) {
388404 return Err ( TraceError :: Other ( "Processor already shutdown" . into ( ) ) ) ;
389405 }
@@ -396,7 +412,12 @@ impl SpanProcessor for BatchSpanProcessor {
396412 . recv_timeout ( self . shutdown_timeout )
397413 . map_err ( |_| TraceError :: ExportTimedOut ( self . shutdown_timeout ) ) ?;
398414 if let Some ( handle) = self . handle . lock ( ) . unwrap ( ) . take ( ) {
399- handle. join ( ) . expect ( "Failed to join thread" ) ;
415+ if let Err ( err) = handle. join ( ) {
416+ return Err ( TraceError :: Other ( format ! (
417+ "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}" ,
418+ err
419+ ) . into ( ) ) ) ;
420+ }
400421 }
401422 result
402423 }
@@ -1032,4 +1053,88 @@ mod tests {
10321053 Some ( Value :: from( "test_service" ) )
10331054 ) ;
10341055 }
1056+
1057+ #[ tokio:: test( flavor = "current_thread" ) ]
1058+ async fn test_batch_processor_current_thread_runtime ( ) {
1059+ let exporter = MockSpanExporter :: new ( ) ;
1060+ let exporter_shared = exporter. exported_spans . clone ( ) ;
1061+
1062+ let config = BatchConfigBuilder :: default ( )
1063+ . with_max_queue_size ( 5 )
1064+ . with_max_export_batch_size ( 3 )
1065+ . with_scheduled_delay ( Duration :: from_millis ( 50 ) )
1066+ . build ( ) ;
1067+
1068+ let processor = BatchSpanProcessor :: new ( exporter, config) ;
1069+
1070+ for _ in 0 ..4 {
1071+ let span = new_test_export_span_data ( ) ;
1072+ processor. on_end ( span) ;
1073+ }
1074+
1075+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1076+
1077+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
1078+ assert_eq ! ( exported_spans. len( ) , 4 ) ;
1079+ }
1080+
1081+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
1082+ async fn test_batch_processor_multi_thread_count_1_runtime ( ) {
1083+ let exporter = MockSpanExporter :: new ( ) ;
1084+ let exporter_shared = exporter. exported_spans . clone ( ) ;
1085+
1086+ let config = BatchConfigBuilder :: default ( )
1087+ . with_max_queue_size ( 5 )
1088+ . with_max_export_batch_size ( 3 )
1089+ . with_scheduled_delay ( Duration :: from_millis ( 50 ) )
1090+ . build ( ) ;
1091+
1092+ let processor = BatchSpanProcessor :: new ( exporter, config) ;
1093+
1094+ for _ in 0 ..4 {
1095+ let span = new_test_export_span_data ( ) ;
1096+ processor. on_end ( span) ;
1097+ }
1098+
1099+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1100+
1101+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
1102+ assert_eq ! ( exported_spans. len( ) , 4 ) ;
1103+ }
1104+
1105+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ]
1106+ async fn test_batch_processor_multi_thread ( ) {
1107+ let exporter = MockSpanExporter :: new ( ) ;
1108+ let exporter_shared = exporter. exported_spans . clone ( ) ;
1109+
1110+ let config = BatchConfigBuilder :: default ( )
1111+ . with_max_queue_size ( 20 )
1112+ . with_max_export_batch_size ( 5 )
1113+ . with_scheduled_delay ( Duration :: from_millis ( 50 ) )
1114+ . build ( ) ;
1115+
1116+ // Create the processor with the thread-safe exporter
1117+ let processor = Arc :: new ( BatchSpanProcessor :: new ( exporter, config) ) ;
1118+
1119+ let mut handles = vec ! [ ] ;
1120+ for _ in 0 ..10 {
1121+ let processor_clone = Arc :: clone ( & processor) ;
1122+ let handle = tokio:: spawn ( async move {
1123+ let span = new_test_export_span_data ( ) ;
1124+ processor_clone. on_end ( span) ;
1125+ } ) ;
1126+ handles. push ( handle) ;
1127+ }
1128+
1129+ for handle in handles {
1130+ handle. await . unwrap ( ) ;
1131+ }
1132+
1133+ // Allow time for batching and export
1134+ tokio:: time:: sleep ( Duration :: from_millis ( 200 ) ) . await ;
1135+
1136+ // Verify exported spans
1137+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
1138+ assert_eq ! ( exported_spans. len( ) , 10 ) ;
1139+ }
10351140}
0 commit comments