Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
/// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
ExportLog(Arc<AtomicBool>),
/// ForceFlush flushes the current buffer to the exporter.
ForceFlush(mpsc::SyncSender<ExportResult>),
Expand Down Expand Up @@ -457,23 +457,31 @@
where
E: LogExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
let mut result = LogResult::Ok(());
let mut total_exported_logs: usize = 0;

while target > 0 && total_exported_logs < target {
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;

Check warning on line 469 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L469

Added line #L469 was not covered by tests
}
}
}

let count_of_logs = logs.len(); // Count of logs that will be exported
let result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting
let count_of_logs = logs.len(); // Count of logs that will be exported
total_exported_logs += count_of_logs;

result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
}
result
}

Expand All @@ -485,6 +493,9 @@

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);

Check warning on line 498 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L496-L498

Added lines #L496 - L498 were not covered by tests
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);
Expand All @@ -497,9 +508,6 @@
&current_batch_size,
&config,
);

// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);
}
Ok(BatchMessage::ForceFlush(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
Expand Down
Loading