From 696f4794d6abe3731e35484fd34731d9952b2e4b Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Fri, 31 Jan 2025 10:57:14 -0800 Subject: [PATCH 1/2] Better handling of shutdown in BatchLogProcessor --- opentelemetry-sdk/src/logs/log_processor.rs | 236 ++++++++++++-------- 1 file changed, 141 insertions(+), 95 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 222c9e186b..24b3437648 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -270,7 +270,6 @@ pub struct BatchLogProcessor { handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, - is_shutdown: AtomicBool, export_log_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -292,87 +291,114 @@ impl Debug for BatchLogProcessor { impl LogProcessor for BatchLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { - // noop after shutdown - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - otel_warn!( - name: "BatchLogProcessor.Emit.ProcessorShutdown", - message = "BatchLogProcessor has been shutdown. No further logs will be emitted." - ); - return; - } - let result = self .logs_sender .try_send(Box::new((record.clone(), instrumentation.clone()))); - if result.is_err() { - // Increment dropped logs count. The first time we have to drop a log, - // emit a warning. - if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", - message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); - } - return; - } - - // At this point, sending the log record to the data channel was successful. - // Increment the current batch size and check if it has reached the max export batch size. - if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size - { - // Check if the a control message for exporting logs is already sent to the worker thread. - // If not, send a control message to export logs. - // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. - - if !self.export_log_message_sent.load(Ordering::Relaxed) { - // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. - // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. - // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. - // We could have used compare_exchange as well here, but it's more verbose than swap. - if !self.export_log_message_sent.swap(true, Ordering::Relaxed) { - match self.message_sender.try_send(BatchMessage::ExportLog( - self.export_log_message_sent.clone(), - )) { - Ok(_) => { - // Control message sent successfully. - } - Err(_err) => { - // TODO: Log error - // If the control message could not be sent, reset the `export_log_message_sent` flag. - self.export_log_message_sent.store(false, Ordering::Relaxed); + // match for result and handle each separately + match result { + Ok(_) => { + // Successfully sent the log record to the data channel. + // Increment the current batch size and check if it has reached + // the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 + >= self.max_export_batch_size + { + // Check if the a control message for exporting logs is + // already sent to the worker thread. If not, send a control + // message to export logs. `export_log_message_sent` is set + // to false ONLY when the worker thread has processed the + // control message. + + if !self.export_log_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load + // operations do not require exclusive access to cache + // line. Perform atomic swap to + // `export_log_message_sent` ONLY when the atomic load + // operation above returns false. Atomic + // swap/compare_exchange operations require exclusive + // access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but + // it's more verbose than swap. + if !self.export_log_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportLog( + self.export_log_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error If the control message + // could not be sent, reset the + // `export_log_message_sent` flag. + self.export_log_message_sent.store(false, Ordering::Relaxed); + } + } } } } } + Err(mpsc::TrySendError::Full(_)) => { + // Increment dropped logs count. The first time we have to drop + // a log, emit a warning. + if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", + message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); + } + return; + } + Err(mpsc::TrySendError::Disconnected(_)) => { + // Given background thread is the only receiver, and it's + // disconnected, it indicates the thread is shutdown + otel_warn!( + name: "BatchLogProcessor.Emit.AfterShutdown", + message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported." + ); + return; + } } } fn force_flush(&self) -> LogResult<()> { - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - return LogResult::Err(LogError::Other( - "BatchLogProcessor is already shutdown".into(), - )); - } let (sender, receiver) = mpsc::sync_channel(1); - self.message_sender + match self + .message_sender .try_send(BatchMessage::ForceFlush(sender)) - .map_err(|err| LogError::Other(err.into()))?; - - receiver - .recv_timeout(self.forceflush_timeout) - .map_err(|err| { - if err == RecvTimeoutError::Timeout { - LogError::ExportTimedOut(self.forceflush_timeout) - } else { - LogError::Other(err.into()) - } - })? + { + Ok(_) => receiver + .recv_timeout(self.forceflush_timeout) + .map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.forceflush_timeout) + } else { + LogError::Other(err.into()) + } + })?, + Err(mpsc::TrySendError::Full(_)) => { + // If the control message could not be sent, emit a warning. + otel_debug!( + name: "BatchLogProcessor.ForceFlush.ControlChannelFull", + message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call." + ); + LogResult::Err(LogError::Other("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into())) + } + Err(mpsc::TrySendError::Disconnected(_)) => { + // Given background thread is the only receiver, and it's + // disconnected, it indicates the thread is shutdown + otel_debug!( + name: "BatchLogProcessor.ForceFlush.AlreadyShutdown", + message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application." + ); + + LogResult::Err(LogError::Other( + "ForceFlush cannot be performed as BatchLogProcessor is already shutdown" + .into(), + )) + } + } } fn shutdown(&self) -> LogResult<()> { - // Set is_shutdown to true - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); - let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -385,35 +411,56 @@ impl LogProcessor for BatchLogProcessor { } let (sender, receiver) = mpsc::sync_channel(1); - self.message_sender - .try_send(BatchMessage::Shutdown(sender)) - .map_err(|err| LogError::Other(err.into()))?; - - receiver - .recv_timeout(self.shutdown_timeout) - .map(|_| { - // join the background thread after receiving back the shutdown signal - if let Some(handle) = self.handle.lock().unwrap().take() { - handle.join().unwrap(); - } - LogResult::Ok(()) - }) - .map_err(|err| match err { - RecvTimeoutError::Timeout => { - otel_error!( - name: "BatchLogProcessor.Shutdown.Timeout", - message = "BatchLogProcessor shutdown timing out." - ); - LogError::ExportTimedOut(self.shutdown_timeout) - } - _ => { - otel_error!( - name: "BatchLogProcessor.Shutdown.Error", - error = format!("{}", err) - ); - LogError::Other(err.into()) - } - })? + match self.message_sender.try_send(BatchMessage::Shutdown(sender)) { + Ok(_) => { + receiver + .recv_timeout(self.shutdown_timeout) + .map(|_| { + // join the background thread after receiving back the + // shutdown signal + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.join().unwrap(); + } + LogResult::Ok(()) + }) + .map_err(|err| match err { + RecvTimeoutError::Timeout => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Timeout", + message = "BatchLogProcessor shutdown timing out." + ); + LogError::ExportTimedOut(self.shutdown_timeout) + } + _ => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Error", + error = format!("{}", err) + ); + LogError::Other(err.into()) + } + })? + } + Err(mpsc::TrySendError::Full(_)) => { + // If the control message could not be sent, emit a warning. + otel_debug!( + name: "BatchLogProcessor.Shutdown.ControlChannelFull", + message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call." + ); + LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into())) + } + Err(mpsc::TrySendError::Disconnected(_)) => { + // Given background thread is the only receiver, and it's + // disconnected, it indicates the thread is shutdown + otel_debug!( + name: "BatchLogProcessor.Shutdown.AlreadyShutdown", + message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management." + ); + + LogResult::Err(LogError::Other( + "BatchLogProcessor is already shutdown".into(), + )) + } + } } fn set_resource(&self, resource: &Resource) { @@ -590,7 +637,6 @@ impl BatchLogProcessor { handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable - is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), From 34bf1266d2f770c3bc23cc136d5146dc92102a16 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Fri, 31 Jan 2025 11:05:46 -0800 Subject: [PATCH 2/2] remove unnecessary returns --- opentelemetry-sdk/src/logs/log_processor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 24b3437648..226df509d7 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -345,7 +345,6 @@ impl LogProcessor for BatchLogProcessor { otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); } - return; } Err(mpsc::TrySendError::Disconnected(_)) => { // Given background thread is the only receiver, and it's @@ -354,7 +353,6 @@ impl LogProcessor for BatchLogProcessor { name: "BatchLogProcessor.Emit.AfterShutdown", message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported." ); - return; } } }