@@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
8585 /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
8686 /// already set). This method is called synchronously within the `Span::end`
8787 /// API, therefore it should not block or throw an exception.
88+ /// TODO - This method should take reference to `SpanData`
8889 fn on_end ( & self , span : SpanData ) ;
8990 /// Force the spans lying in the cache to be exported.
9091 fn force_flush ( & self ) -> TraceResult < ( ) > ;
@@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor {
163164 }
164165}
165166
167+ use crate :: export:: trace:: ExportResult ;
166168/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
167169/// in batches to the configured `SpanExporter`. This processor is ideal for
168170/// high-throughput environments, as it minimizes the overhead of exporting spans
@@ -217,12 +219,10 @@ impl SpanProcessor for SimpleSpanProcessor {
217219/// provider.shutdown();
218220/// }
219221/// ```
220- use futures_executor:: block_on;
221222use std:: sync:: mpsc:: sync_channel;
223+ use std:: sync:: mpsc:: Receiver ;
222224use std:: sync:: mpsc:: RecvTimeoutError ;
223225use std:: sync:: mpsc:: SyncSender ;
224- use std:: sync:: mpsc:: Receiver ;
225- use crate :: export:: trace:: ExportResult ;
226226
227227/// Messages exchanged between the main thread and the background thread.
228228#[ allow( clippy:: large_enum_variant) ]
@@ -248,6 +248,7 @@ pub struct BatchSpanProcessor {
248248 export_span_message_sent : Arc < AtomicBool > ,
249249 current_batch_size : Arc < AtomicUsize > ,
250250 max_export_batch_size : usize ,
251+ max_queue_size : usize ,
251252}
252253
253254impl BatchSpanProcessor {
@@ -262,7 +263,7 @@ impl BatchSpanProcessor {
262263 where
263264 E : SpanExporter + Send + ' static ,
264265 {
265- let ( message_sender , message_receiver ) = sync_channel :: < SpanData > ( config. max_queue_size ) ;
266+ let ( span_sender , span_receiver ) = sync_channel :: < SpanData > ( config. max_queue_size ) ;
266267 let ( message_sender, message_receiver) = sync_channel :: < BatchMessage > ( 64 ) ; // Is this a reasonable bound?
267268 let max_queue_size = config. max_queue_size ;
268269 let max_export_batch_size = config. max_export_batch_size ;
@@ -281,40 +282,6 @@ impl BatchSpanProcessor {
281282 let mut spans = Vec :: with_capacity ( config. max_export_batch_size ) ;
282283 let mut last_export_time = Instant :: now ( ) ;
283284 let current_batch_size = current_batch_size_for_thread;
284- // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
285- // It returns the result of the export operation.
286- // It expects the span vec to be empty when it's called.
287- #[ inline]
288- fn get_spans_and_export < E > (
289- spans_receiver : & Receiver < SpanData > ,
290- exporter : & E ,
291- spans : & mut Vec < SpanData > ,
292- last_export_time : & mut Instant ,
293- current_batch_size : & AtomicUsize ,
294- config : & BatchConfig ,
295- ) -> ExportResult
296- where
297- E : SpanExporter + Send + Sync + ' static ,
298- {
299- // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
300- while let Ok ( log) = spans_receiver. try_recv ( ) {
301- spans. push ( log) ;
302- if spans. len ( ) == config. max_export_batch_size {
303- break ;
304- }
305- }
306-
307- let count_of_logs = spans. len ( ) ; // Count of logs that will be exported
308- let result = export_with_timeout_sync (
309- config. max_export_timeout ,
310- exporter,
311- spans,
312- last_export_time,
313- ) ; // This method clears the logs vec after exporting
314-
315- current_batch_size. fetch_sub ( count_of_logs, Ordering :: Relaxed ) ;
316- result
317- }
318285 loop {
319286 let remaining_time_option = config
320287 . scheduled_delay
@@ -325,47 +292,71 @@ impl BatchSpanProcessor {
325292 } ;
326293 match message_receiver. recv_timeout ( remaining_time) {
327294 Ok ( message) => match message {
328- BatchMessage :: ExportSpan ( span ) => {
295+ BatchMessage :: ExportSpan ( export_span_message_sent ) => {
329296 otel_debug ! (
330297 name: "BatchSpanProcessor.ExportingDueToBatchSize" ,
331298 ) ;
332- spans. push ( span) ;
333- if spans. len ( ) >= config. max_queue_size
334- || last_export_time. elapsed ( ) >= config. scheduled_delay
335- {
336- if let Err ( err) = block_on ( exporter. export ( spans. split_off ( 0 ) ) )
337- {
338- otel_error ! (
339- name: "BatchSpanProcessor.ExportError" ,
340- error = format!( "{}" , err)
341- ) ;
342- }
343- last_export_time = Instant :: now ( ) ;
344- }
299+ let _ = Self :: get_spans_and_export (
300+ & span_receiver,
301+ & mut exporter,
302+ & mut spans,
303+ & mut last_export_time,
304+ & current_batch_size,
305+ & config,
306+ ) ;
307+ // Reset the export span message sent flag now it has has been processed.
308+ export_span_message_sent. store ( false , Ordering :: Relaxed ) ;
345309 }
346310 BatchMessage :: ForceFlush ( sender) => {
347- let result = block_on ( exporter. export ( spans. split_off ( 0 ) ) ) ;
311+ otel_debug ! ( name: "BatchSpanProcessor.ExportingDueToForceFlush" ) ;
312+ let result = Self :: get_spans_and_export (
313+ & span_receiver,
314+ & mut exporter,
315+ & mut spans,
316+ & mut last_export_time,
317+ & current_batch_size,
318+ & config,
319+ ) ;
348320 let _ = sender. send ( result) ;
349321 }
350322 BatchMessage :: Shutdown ( sender) => {
351- let result = block_on ( exporter. export ( spans. split_off ( 0 ) ) ) ;
323+ otel_debug ! ( name: "BatchSpanProcessor.ExportingDueToShutdown" ) ;
324+ let result = Self :: get_spans_and_export (
325+ & span_receiver,
326+ & mut exporter,
327+ & mut spans,
328+ & mut last_export_time,
329+ & current_batch_size,
330+ & config,
331+ ) ;
352332 let _ = sender. send ( result) ;
333+
334+ otel_debug ! (
335+ name: "BatchSpanProcessor.ThreadExiting" ,
336+ reason = "ShutdownRequested"
337+ ) ;
338+ //
339+ // break out the loop and return from the current background thread.
340+ //
353341 break ;
354342 }
355343 BatchMessage :: SetResource ( resource) => {
356344 exporter. set_resource ( & resource) ;
357345 }
358346 } ,
359347 Err ( RecvTimeoutError :: Timeout ) => {
360- if last_export_time. elapsed ( ) >= config. scheduled_delay {
361- if let Err ( err) = block_on ( exporter. export ( spans. split_off ( 0 ) ) ) {
362- otel_error ! (
363- name: "BatchSpanProcessor.ExportError" ,
364- error = format!( "{}" , err)
365- ) ;
366- }
367- last_export_time = Instant :: now ( ) ;
368- }
348+ otel_debug ! (
349+ name: "BatchLogProcessor.ExportingDueToTimer" ,
350+ ) ;
351+
352+ let _ = Self :: get_spans_and_export (
353+ & span_receiver,
354+ & mut exporter,
355+ & mut spans,
356+ & mut last_export_time,
357+ & current_batch_size,
358+ & config,
359+ ) ;
369360 }
370361 Err ( RecvTimeoutError :: Disconnected ) => {
371362 // Channel disconnected, only thing to do is break
@@ -385,12 +376,17 @@ impl BatchSpanProcessor {
385376 . expect ( "Failed to spawn thread" ) ; //TODO: Handle thread spawn failure
386377
387378 Self {
379+ span_sender,
388380 message_sender,
389381 handle : Mutex :: new ( Some ( handle) ) ,
390382 forceflush_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
391383 shutdown_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
392384 is_shutdown : AtomicBool :: new ( false ) ,
393385 dropped_span_count : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
386+ max_queue_size,
387+ export_span_message_sent : Arc :: new ( AtomicBool :: new ( false ) ) ,
388+ current_batch_size,
389+ max_export_batch_size,
394390 }
395391 }
396392
@@ -404,6 +400,72 @@ impl BatchSpanProcessor {
404400 config : BatchConfig :: default ( ) ,
405401 }
406402 }
403+
404+ // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
405+ // It returns the result of the export operation.
406+ // It expects the span vec to be empty when it's called.
407+ #[ inline]
408+ fn get_spans_and_export < E > (
409+ spans_receiver : & Receiver < SpanData > ,
410+ exporter : & mut E ,
411+ spans : & mut Vec < SpanData > ,
412+ last_export_time : & mut Instant ,
413+ current_batch_size : & AtomicUsize ,
414+ config : & BatchConfig ,
415+ ) -> ExportResult
416+ where
417+ E : SpanExporter + Send + Sync + ' static ,
418+ {
419+ // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
420+ while let Ok ( log) = spans_receiver. try_recv ( ) {
421+ spans. push ( log) ;
422+ if spans. len ( ) == config. max_export_batch_size {
423+ break ;
424+ }
425+ }
426+
427+ let count_of_logs = spans. len ( ) ; // Count of logs that will be exported
428+ let result = Self :: export_with_timeout_sync (
429+ config. max_export_timeout ,
430+ exporter,
431+ spans,
432+ last_export_time,
433+ ) ; // This method clears the logs vec after exporting
434+
435+ current_batch_size. fetch_sub ( count_of_logs, Ordering :: Relaxed ) ;
436+ result
437+ }
438+
439+ #[ allow( clippy:: vec_box) ]
440+ fn export_with_timeout_sync < E > (
441+ _: Duration , // TODO, enforcing timeout in exporter.
442+ exporter : & mut E ,
443+ batch : & mut Vec < SpanData > ,
444+ last_export_time : & mut Instant ,
445+ ) -> ExportResult
446+ where
447+ E : SpanExporter + Send + Sync + ' static ,
448+ {
449+ * last_export_time = Instant :: now ( ) ;
450+
451+ if batch. is_empty ( ) {
452+ return TraceResult :: Ok ( ( ) ) ;
453+ }
454+
455+ let export = exporter. export ( batch. split_off ( 0 ) ) ;
456+ let export_result = futures_executor:: block_on ( export) ;
457+
458+ match export_result {
459+ Ok ( _) => TraceResult :: Ok ( ( ) ) ,
460+ Err ( err) => {
461+ otel_error ! (
462+ name: "BatchLogProcessor.ExportError" ,
463+ error = format!( "{}" , err)
464+ ) ;
465+ TraceResult :: Err ( err)
466+ }
467+ }
468+ }
407469}
408470
409471impl SpanProcessor for BatchSpanProcessor {
@@ -418,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor {
418480 // this is a warning, as the user is trying to emit after the processor has been shutdown
419481 otel_warn ! (
420482 name: "BatchSpanProcessor.Emit.ProcessorShutdown" ,
483+ message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
421484 ) ;
422485 return ;
423486 }
424- let result = self . message_sender . try_send ( BatchMessage :: ExportSpan ( span) ) ;
487+ let result = self . span_sender . try_send ( span) ;
425488
426489 if result. is_err ( ) {
427490 // Increment dropped span count. The first time we have to drop a span,
@@ -431,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor {
431494 message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped." ) ;
432495 }
433496 }
497+ // At this point, sending the log record to the data channel was successful.
498+ // Increment the current batch size and check if it has reached the max export batch size.
499+ if self . current_batch_size . fetch_add ( 1 , Ordering :: Relaxed ) + 1 >= self . max_export_batch_size
500+ {
501+ // Check if the a control message for exporting logs is already sent to the worker thread.
502+ // If not, send a control message to export logs.
503+ // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
504+
505+ if !self . export_span_message_sent . load ( Ordering :: Relaxed ) {
506+ // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
507+ // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
508+ // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
509+ // We could have used compare_exchange as well here, but it's more verbose than swap.
510+ if !self . export_span_message_sent . swap ( true , Ordering :: Relaxed ) {
511+ match self . message_sender . try_send ( BatchMessage :: ExportSpan (
512+ self . export_span_message_sent . clone ( ) ,
513+ ) ) {
514+ Ok ( _) => {
515+ // Control message sent successfully.
516+ }
517+ Err ( _err) => {
518+ // TODO: Log error
519+ // If the control message could not be sent, reset the `export_log_message_sent` flag.
520+ self . export_span_message_sent
521+ . store ( false , Ordering :: Relaxed ) ;
522+ }
523+ }
524+ }
525+ }
526+ }
434527 }
435528
436529 /// Flushes all pending spans.
@@ -450,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor {
450543
451544 /// Shuts down the processor.
452545 fn shutdown ( & self ) -> TraceResult < ( ) > {
546+ if self . is_shutdown . swap ( true , Ordering :: Relaxed ) {
547+ return Err ( TraceError :: Other ( "Processor already shutdown" . into ( ) ) ) ;
548+ }
453549 let dropped_spans = self . dropped_span_count . load ( Ordering :: Relaxed ) ;
550+ let max_queue_size = self . max_queue_size ;
454551 if dropped_spans > 0 {
455552 otel_warn ! (
456553 name: "BatchSpanProcessor.LogsDropped" ,
457554 dropped_span_count = dropped_spans,
555+ max_queue_size = max_queue_size,
458556 message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
459557 ) ;
460558 }
461- if self . is_shutdown . swap ( true , Ordering :: Relaxed ) {
462- return Err ( TraceError :: Other ( "Processor already shutdown" . into ( ) ) ) ;
463- }
559+
464560 let ( sender, receiver) = sync_channel ( 1 ) ;
465561 self . message_sender
466562 . try_send ( BatchMessage :: Shutdown ( sender) )
0 commit comments