Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
/// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
ExportLog(Arc<AtomicBool>),
/// ForceFlush flushes the current buffer to the exporter.
ForceFlush(mpsc::SyncSender<ExportResult>),
Expand Down Expand Up @@ -306,7 +306,7 @@
// If not, send a control message to export logs.
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_log_message_sent.load(Ordering::Relaxed) {
if !self.export_log_message_sent.load(Ordering::Acquire) {

Check warning on line 309 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L309

Added line #L309 was not covered by tests
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
Expand Down Expand Up @@ -457,23 +457,31 @@
where
E: LogExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
let mut result = LogResult::Ok(());
let mut total_exported_logs: usize = 0;

while target > 0 && total_exported_logs < target {
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;

Check warning on line 469 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L469

Added line #L469 was not covered by tests
}
}
}

let count_of_logs = logs.len(); // Count of logs that will be exported
let result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting
let count_of_logs = logs.len(); // Count of logs that will be exported
total_exported_logs += count_of_logs;

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
}
result
}

Expand All @@ -499,7 +507,7 @@
);

// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);
export_log_message_sent.store(false, Ordering::Release);

Check warning on line 510 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L510

Added line #L510 was not covered by tests
}
Ok(BatchMessage::ForceFlush(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
Expand Down
Loading