@@ -199,8 +199,8 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
199199#[ allow( clippy:: large_enum_variant) ]
200200#[ derive( Debug ) ]
201201enum BatchMessage {
202- /// Export logs, called when the log is emitted .
203- ExportLog ( Box < ( LogRecord , InstrumentationScope ) > ) ,
202+ /// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size` .
203+ ExportLog ( Arc < AtomicBool > ) ,
204204 /// ForceFlush flushes the current buffer to the exporter.
205205 ForceFlush ( mpsc:: SyncSender < ExportResult > ) ,
206206 /// Shut down the worker thread, push all logs in buffer to the exporter.
@@ -209,6 +209,8 @@ enum BatchMessage {
209209 SetResource ( Arc < Resource > ) ,
210210}
211211
212+ type LogsData = Box < ( LogRecord , InstrumentationScope ) > ;
213+
212214/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
213215/// in batches to the configured `LogExporter`. This processor is ideal for
214216/// high-throughput environments, as it minimizes the overhead of exporting logs
@@ -246,11 +248,15 @@ enum BatchMessage {
246248/// .build();
247249///
248250pub struct BatchLogProcessor {
249- message_sender : SyncSender < BatchMessage > ,
251+ logs_sender : SyncSender < LogsData > , // Data channel to store log records and instrumentation scopes
252+ message_sender : SyncSender < BatchMessage > , // Control channel to store control messages for the worker thread
250253 handle : Mutex < Option < thread:: JoinHandle < ( ) > > > ,
251254 forceflush_timeout : Duration ,
252255 shutdown_timeout : Duration ,
253256 is_shutdown : AtomicBool ,
257+ export_log_message_sent : Arc < AtomicBool > ,
258+ current_batch_size : Arc < AtomicUsize > ,
259+ max_export_batch_size : usize ,
254260
255261 // Track dropped logs - we'll log this at shutdown
256262 dropped_logs_count : AtomicUsize ,
@@ -279,11 +285,19 @@ impl LogProcessor for BatchLogProcessor {
279285 }
280286
281287 let result = self
282- . message_sender
283- . try_send ( BatchMessage :: ExportLog ( Box :: new ( (
284- record. clone ( ) ,
285- instrumentation. clone ( ) ,
286- ) ) ) ) ;
288+ . logs_sender
289+ . try_send ( Box :: new ( ( record. clone ( ) , instrumentation. clone ( ) ) ) ) ;
290+
291+ if self . current_batch_size . fetch_add ( 1 , Ordering :: Relaxed ) >= self . max_export_batch_size {
292+ // Check if the a control message for exporting logs is already sent to the worker thread.
293+ // If not, send a control message to export logs.
294+ // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
295+ if !self . export_log_message_sent . swap ( true , Ordering :: Relaxed ) {
296+ let _ = self . message_sender . try_send ( BatchMessage :: ExportLog (
297+ self . export_log_message_sent . clone ( ) ,
298+ ) ) ; // TODO: Handle error
299+ }
300+ }
287301
288302 if result. is_err ( ) {
289303 // Increment dropped logs count. The first time we have to drop a log,
@@ -388,8 +402,12 @@ impl BatchLogProcessor {
388402 where
389403 E : LogExporter + Send + Sync + ' static ,
390404 {
391- let ( message_sender, message_receiver) = mpsc:: sync_channel ( config. max_queue_size ) ;
405+ let ( logs_sender, logs_receiver) = mpsc:: sync_channel :: < LogsData > ( config. max_queue_size ) ;
406+ let ( message_sender, message_receiver) = mpsc:: sync_channel :: < BatchMessage > ( 64 ) ; // Is this a reasonable bound?
392407 let max_queue_size = config. max_queue_size ;
408+ let max_export_batch_size = config. max_export_batch_size ;
409+ let current_batch_size = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
410+ let current_batch_size_for_thread = current_batch_size. clone ( ) ;
393411
394412 let handle = thread:: Builder :: new ( )
395413 . name ( "OpenTelemetry.Logs.BatchProcessor" . to_string ( ) )
@@ -402,6 +420,42 @@ impl BatchLogProcessor {
402420 ) ;
403421 let mut last_export_time = Instant :: now ( ) ;
404422 let mut logs = Vec :: with_capacity ( config. max_export_batch_size ) ;
423+ let current_batch_size = current_batch_size_for_thread;
424+
425+ // This method gets upto `max_export_batch_size` amount of logs from the channel and exports them.
426+ // It returns the result of the export operation.
427+ // It expects the logs vec to be empty when it's called.
428+ #[ inline]
429+ fn get_logs_and_export < E > (
430+ logs_receiver : & mpsc:: Receiver < LogsData > ,
431+ exporter : & E ,
432+ logs : & mut Vec < LogsData > ,
433+ last_export_time : & mut Instant ,
434+ current_batch_size : & AtomicUsize ,
435+ config : & BatchConfig ,
436+ ) -> ExportResult
437+ where
438+ E : LogExporter + Send + Sync + ' static ,
439+ {
440+ // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
441+ while let Ok ( log) = logs_receiver. try_recv ( ) {
442+ logs. push ( log) ;
443+ if logs. len ( ) == config. max_export_batch_size {
444+ break ;
445+ }
446+ }
447+
448+ let count_of_logs = logs. len ( ) ; // Count of logs that will be exported
449+ let result = export_with_timeout_sync (
450+ config. max_export_timeout ,
451+ exporter,
452+ logs,
453+ last_export_time,
454+ ) ; // This method clears the logs vec after exporting
455+
456+ current_batch_size. fetch_sub ( count_of_logs, Ordering :: Relaxed ) ;
457+ result
458+ }
405459
406460 loop {
407461 let remaining_time = config
@@ -410,37 +464,44 @@ impl BatchLogProcessor {
410464 . unwrap_or ( config. scheduled_delay ) ;
411465
412466 match message_receiver. recv_timeout ( remaining_time) {
413- Ok ( BatchMessage :: ExportLog ( log) ) => {
414- logs. push ( log) ;
415- if logs. len ( ) == config. max_export_batch_size {
416- otel_debug ! (
417- name: "BatchLogProcessor.ExportingDueToBatchSize" ,
418- ) ;
419- let _ = export_with_timeout_sync (
420- config. max_export_timeout ,
421- & mut exporter,
422- & mut logs,
423- & mut last_export_time,
424- ) ;
425- }
467+ Ok ( BatchMessage :: ExportLog ( export_log_message_sent) ) => {
468+ otel_debug ! (
469+ name: "BatchLogProcessor.ExportingDueToBatchSize" ,
470+ ) ;
471+
472+ let _ = get_logs_and_export (
473+ & logs_receiver,
474+ & exporter,
475+ & mut logs,
476+ & mut last_export_time,
477+ & current_batch_size,
478+ & config,
479+ ) ;
480+
481+ // Reset the export log message sent flag now it has has been processed.
482+ export_log_message_sent. store ( false , Ordering :: Relaxed ) ;
426483 }
427484 Ok ( BatchMessage :: ForceFlush ( sender) ) => {
428485 otel_debug ! ( name: "BatchLogProcessor.ExportingDueToForceFlush" ) ;
429- let result = export_with_timeout_sync (
430- config . max_export_timeout ,
431- & mut exporter,
486+ let result = get_logs_and_export (
487+ & logs_receiver ,
488+ & exporter,
432489 & mut logs,
433490 & mut last_export_time,
491+ & current_batch_size,
492+ & config,
434493 ) ;
435494 let _ = sender. send ( result) ;
436495 }
437496 Ok ( BatchMessage :: Shutdown ( sender) ) => {
438497 otel_debug ! ( name: "BatchLogProcessor.ExportingDueToShutdown" ) ;
439- let result = export_with_timeout_sync (
440- config . max_export_timeout ,
441- & mut exporter,
498+ let result = get_logs_and_export (
499+ & logs_receiver ,
500+ & exporter,
442501 & mut logs,
443502 & mut last_export_time,
503+ & current_batch_size,
504+ & config,
444505 ) ;
445506 let _ = sender. send ( result) ;
446507
@@ -460,11 +521,14 @@ impl BatchLogProcessor {
460521 otel_debug ! (
461522 name: "BatchLogProcessor.ExportingDueToTimer" ,
462523 ) ;
463- let _ = export_with_timeout_sync (
464- config. max_export_timeout ,
465- & mut exporter,
524+
525+ let _ = get_logs_and_export (
526+ & logs_receiver,
527+ & exporter,
466528 & mut logs,
467529 & mut last_export_time,
530+ & current_batch_size,
531+ & config,
468532 ) ;
469533 }
470534 Err ( RecvTimeoutError :: Disconnected ) => {
@@ -486,13 +550,17 @@ impl BatchLogProcessor {
486550
487551 // Return batch processor with link to worker
488552 BatchLogProcessor {
553+ logs_sender,
489554 message_sender,
490555 handle : Mutex :: new ( Some ( handle) ) ,
491556 forceflush_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
492557 shutdown_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
493558 is_shutdown : AtomicBool :: new ( false ) ,
494559 dropped_logs_count : AtomicUsize :: new ( 0 ) ,
495560 max_queue_size,
561+ export_log_message_sent : Arc :: new ( AtomicBool :: new ( false ) ) ,
562+ current_batch_size,
563+ max_export_batch_size,
496564 }
497565 }
498566
@@ -511,7 +579,7 @@ impl BatchLogProcessor {
511579#[ allow( clippy:: vec_box) ]
512580fn export_with_timeout_sync < E > (
513581 _: Duration , // TODO, enforcing timeout in exporter.
514- exporter : & mut E ,
582+ exporter : & E ,
515583 batch : & mut Vec < Box < ( LogRecord , InstrumentationScope ) > > ,
516584 last_export_time : & mut Instant ,
517585) -> ExportResult
0 commit comments