diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b6284b87c6..4ad4c0342d 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -199,8 +199,8 @@ impl LogProcessor for SimpleLogProcessor { #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - /// Export logs, called when the log is emitted. - ExportLog(Box<(LogRecord, InstrumentationScope)>), + /// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. + ExportLog(Arc), /// ForceFlush flushes the current buffer to the exporter. ForceFlush(mpsc::SyncSender), /// Shut down the worker thread, push all logs in buffer to the exporter. @@ -209,6 +209,8 @@ enum BatchMessage { SetResource(Arc), } +type LogsData = Box<(LogRecord, InstrumentationScope)>; + /// The `BatchLogProcessor` collects finished logs in a buffer and exports them /// in batches to the configured `LogExporter`. This processor is ideal for /// high-throughput environments, as it minimizes the overhead of exporting logs @@ -246,11 +248,15 @@ enum BatchMessage { /// .build(); /// pub struct BatchLogProcessor { - message_sender: SyncSender, + logs_sender: SyncSender, // Data channel to store log records and instrumentation scopes + message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, + export_log_message_sent: Arc, + current_batch_size: Arc, + max_export_batch_size: usize, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, @@ -279,11 +285,8 @@ impl LogProcessor for BatchLogProcessor { } let result = self - .message_sender - .try_send(BatchMessage::ExportLog(Box::new(( - record.clone(), - instrumentation.clone(), - )))); + .logs_sender + .try_send(Box::new((record.clone(), instrumentation.clone()))); if result.is_err() { // Increment dropped logs count. The first time we have to drop a log, @@ -292,6 +295,37 @@ impl LogProcessor for BatchLogProcessor { otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); } + return; + } + + // At this point, sending the log record to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size + { + // Check if the a control message for exporting logs is already sent to the worker thread. + // If not, send a control message to export logs. + // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_log_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_log_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportLog( + self.export_log_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_log_message_sent` flag. + self.export_log_message_sent.store(false, Ordering::Relaxed); + } + } + } + } } } @@ -388,8 +422,12 @@ impl BatchLogProcessor { where E: LogExporter + Send + Sync + 'static, { - let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); + let (logs_sender, logs_receiver) = mpsc::sync_channel::(config.max_queue_size); + let (message_sender, message_receiver) = mpsc::sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; + let max_export_batch_size = config.max_export_batch_size; + let current_batch_size = Arc::new(AtomicUsize::new(0)); + let current_batch_size_for_thread = current_batch_size.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Logs.BatchProcessor".to_string()) @@ -402,6 +440,42 @@ impl BatchLogProcessor { ); let mut last_export_time = Instant::now(); let mut logs = Vec::with_capacity(config.max_export_batch_size); + let current_batch_size = current_batch_size_for_thread; + + // This method gets upto `max_export_batch_size` amount of logs from the channel and exports them. + // It returns the result of the export operation. + // It expects the logs vec to be empty when it's called. + #[inline] + fn get_logs_and_export( + logs_receiver: &mpsc::Receiver, + exporter: &E, + logs: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: LogExporter + Send + Sync + 'static, + { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = logs_receiver.try_recv() { + logs.push(log); + if logs.len() == config.max_export_batch_size { + break; + } + } + + let count_of_logs = logs.len(); // Count of logs that will be exported + let result = export_with_timeout_sync( + config.max_export_timeout, + exporter, + logs, + last_export_time, + ); // This method clears the logs vec after exporting + + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + result + } loop { let remaining_time = config @@ -410,37 +484,44 @@ impl BatchLogProcessor { .unwrap_or(config.scheduled_delay); match message_receiver.recv_timeout(remaining_time) { - Ok(BatchMessage::ExportLog(log)) => { - logs.push(log); - if logs.len() == config.max_export_batch_size { - otel_debug!( - name: "BatchLogProcessor.ExportingDueToBatchSize", - ); - let _ = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, - &mut logs, - &mut last_export_time, - ); - } + Ok(BatchMessage::ExportLog(export_log_message_sent)) => { + otel_debug!( + name: "BatchLogProcessor.ExportingDueToBatchSize", + ); + + let _ = get_logs_and_export( + &logs_receiver, + &exporter, + &mut logs, + &mut last_export_time, + ¤t_batch_size, + &config, + ); + + // Reset the export log message sent flag now it has has been processed. + export_log_message_sent.store(false, Ordering::Relaxed); } Ok(BatchMessage::ForceFlush(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush"); - let result = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + let result = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); let _ = sender.send(result); } Ok(BatchMessage::Shutdown(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown"); - let result = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + let result = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); let _ = sender.send(result); @@ -460,11 +541,14 @@ impl BatchLogProcessor { otel_debug!( name: "BatchLogProcessor.ExportingDueToTimer", ); - let _ = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + + let _ = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); } Err(RecvTimeoutError::Disconnected) => { @@ -486,6 +570,7 @@ impl BatchLogProcessor { // Return batch processor with link to worker BatchLogProcessor { + logs_sender, message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable @@ -493,6 +578,9 @@ impl BatchLogProcessor { is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, + export_log_message_sent: Arc::new(AtomicBool::new(false)), + current_batch_size, + max_export_batch_size, } } @@ -511,7 +599,7 @@ impl BatchLogProcessor { #[allow(clippy::vec_box)] fn export_with_timeout_sync( _: Duration, // TODO, enforcing timeout in exporter. - exporter: &mut E, + exporter: &E, batch: &mut Vec>, last_export_time: &mut Instant, ) -> ExportResult