@@ -199,7 +199,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
199199#[ allow( clippy:: large_enum_variant) ]
200200#[ derive( Debug ) ]
201201enum BatchMessage {
202- /// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
202+ /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
203203 ExportLog ( Arc < AtomicBool > ) ,
204204 /// ForceFlush flushes the current buffer to the exporter.
205205 ForceFlush ( mpsc:: SyncSender < ExportResult > ) ,
@@ -457,23 +457,31 @@ impl BatchLogProcessor {
457457 where
458458 E : LogExporter + Send + Sync + ' static ,
459459 {
460- // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
461- while let Ok ( log) = logs_receiver. try_recv ( ) {
462- logs. push ( log) ;
463- if logs. len ( ) == config. max_export_batch_size {
464- break ;
460+ let target = current_batch_size. load ( Ordering :: Relaxed ) ; // `target` is used to determine the stopping criteria for exporting logs.
461+ let mut result = LogResult :: Ok ( ( ) ) ;
462+ let mut total_exported_logs: usize = 0 ;
463+
464+ while target > 0 && total_exported_logs < target {
465+ // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
466+ while let Ok ( log) = logs_receiver. try_recv ( ) {
467+ logs. push ( log) ;
468+ if logs. len ( ) == config. max_export_batch_size {
469+ break ;
470+ }
465471 }
466- }
467472
468- let count_of_logs = logs. len ( ) ; // Count of logs that will be exported
469- let result = export_with_timeout_sync (
470- config. max_export_timeout ,
471- exporter,
472- logs,
473- last_export_time,
474- ) ; // This method clears the logs vec after exporting
473+ let count_of_logs = logs. len ( ) ; // Count of logs that will be exported
474+ total_exported_logs += count_of_logs;
475+
476+ result = export_with_timeout_sync (
477+ config. max_export_timeout ,
478+ exporter,
479+ logs,
480+ last_export_time,
481+ ) ; // This method clears the logs vec after exporting
475482
476- current_batch_size. fetch_sub ( count_of_logs, Ordering :: Relaxed ) ;
483+ current_batch_size. fetch_sub ( count_of_logs, Ordering :: Relaxed ) ;
484+ }
477485 result
478486 }
479487
@@ -485,6 +493,9 @@ impl BatchLogProcessor {
485493
486494 match message_receiver. recv_timeout ( remaining_time) {
487495 Ok ( BatchMessage :: ExportLog ( export_log_message_sent) ) => {
496+ // Reset the export log message sent flag now it has has been processed.
497+ export_log_message_sent. store ( false , Ordering :: Relaxed ) ;
498+
488499 otel_debug ! (
489500 name: "BatchLogProcessor.ExportingDueToBatchSize" ,
490501 ) ;
@@ -497,9 +508,6 @@ impl BatchLogProcessor {
497508 & current_batch_size,
498509 & config,
499510 ) ;
500-
501- // Reset the export log message sent flag now it has has been processed.
502- export_log_message_sent. store ( false , Ordering :: Relaxed ) ;
503511 }
504512 Ok ( BatchMessage :: ForceFlush ( sender) ) => {
505513 otel_debug ! ( name: "BatchLogProcessor.ExportingDueToForceFlush" ) ;
0 commit comments