Skip to content

Commit 158524a

Browse files
committed
Fix BatchLogProcessor
1 parent 888d5a3 commit 158524a

File tree

1 file changed

+25
-17
lines changed

1 file changed

+25
-17
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
199199
#[allow(clippy::large_enum_variant)]
200200
#[derive(Debug)]
201201
enum BatchMessage {
202-
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
202+
/// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
203203
ExportLog(Arc<AtomicBool>),
204204
/// ForceFlush flushes the current buffer to the exporter.
205205
ForceFlush(mpsc::SyncSender<ExportResult>),
@@ -306,7 +306,7 @@ impl LogProcessor for BatchLogProcessor {
306306
// If not, send a control message to export logs.
307307
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
308308

309-
if !self.export_log_message_sent.load(Ordering::Relaxed) {
309+
if !self.export_log_message_sent.load(Ordering::Acquire) {
310310
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
311311
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
312312
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
@@ -457,23 +457,31 @@ impl BatchLogProcessor {
457457
where
458458
E: LogExporter + Send + Sync + 'static,
459459
{
460-
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
461-
while let Ok(log) = logs_receiver.try_recv() {
462-
logs.push(log);
463-
if logs.len() == config.max_export_batch_size {
464-
break;
460+
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
461+
let mut result = LogResult::Ok(());
462+
let mut total_exported_logs: usize = 0;
463+
464+
while target > 0 && total_exported_logs < target {
465+
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
466+
while let Ok(log) = logs_receiver.try_recv() {
467+
logs.push(log);
468+
if logs.len() == config.max_export_batch_size {
469+
break;
470+
}
465471
}
466-
}
467472

468-
let count_of_logs = logs.len(); // Count of logs that will be exported
469-
let result = export_with_timeout_sync(
470-
config.max_export_timeout,
471-
exporter,
472-
logs,
473-
last_export_time,
474-
); // This method clears the logs vec after exporting
473+
let count_of_logs = logs.len(); // Count of logs that will be exported
474+
total_exported_logs += count_of_logs;
475475

476-
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
476+
result = export_with_timeout_sync(
477+
config.max_export_timeout,
478+
exporter,
479+
logs,
480+
last_export_time,
481+
); // This method clears the logs vec after exporting
482+
483+
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
484+
}
477485
result
478486
}
479487

@@ -499,7 +507,7 @@ impl BatchLogProcessor {
499507
);
500508

501509
// Reset the export log message sent flag now it has has been processed.
502-
export_log_message_sent.store(false, Ordering::Relaxed);
510+
export_log_message_sent.store(false, Ordering::Release);
503511
}
504512
Ok(BatchMessage::ForceFlush(sender)) => {
505513
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");

0 commit comments

Comments
 (0)