Skip to content

Commit 696f479

Browse files
committed
Better handling of shutdown in BatchLogProcessor
1 parent 9dfcff1 commit 696f479

File tree

1 file changed

+141
-95
lines changed

1 file changed

+141
-95
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 141 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ pub struct BatchLogProcessor {
270270
handle: Mutex<Option<thread::JoinHandle<()>>>,
271271
forceflush_timeout: Duration,
272272
shutdown_timeout: Duration,
273-
is_shutdown: AtomicBool,
274273
export_log_message_sent: Arc<AtomicBool>,
275274
current_batch_size: Arc<AtomicUsize>,
276275
max_export_batch_size: usize,
@@ -292,87 +291,114 @@ impl Debug for BatchLogProcessor {
292291

293292
impl LogProcessor for BatchLogProcessor {
294293
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
295-
// noop after shutdown
296-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
297-
otel_warn!(
298-
name: "BatchLogProcessor.Emit.ProcessorShutdown",
299-
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
300-
);
301-
return;
302-
}
303-
304294
let result = self
305295
.logs_sender
306296
.try_send(Box::new((record.clone(), instrumentation.clone())));
307297

308-
if result.is_err() {
309-
// Increment dropped logs count. The first time we have to drop a log,
310-
// emit a warning.
311-
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
312-
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
313-
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
314-
}
315-
return;
316-
}
317-
318-
// At this point, sending the log record to the data channel was successful.
319-
// Increment the current batch size and check if it has reached the max export batch size.
320-
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
321-
{
322-
// Check if the a control message for exporting logs is already sent to the worker thread.
323-
// If not, send a control message to export logs.
324-
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
325-
326-
if !self.export_log_message_sent.load(Ordering::Relaxed) {
327-
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
328-
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
329-
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
330-
// We could have used compare_exchange as well here, but it's more verbose than swap.
331-
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
332-
match self.message_sender.try_send(BatchMessage::ExportLog(
333-
self.export_log_message_sent.clone(),
334-
)) {
335-
Ok(_) => {
336-
// Control message sent successfully.
337-
}
338-
Err(_err) => {
339-
// TODO: Log error
340-
// If the control message could not be sent, reset the `export_log_message_sent` flag.
341-
self.export_log_message_sent.store(false, Ordering::Relaxed);
298+
// match for result and handle each separately
299+
match result {
300+
Ok(_) => {
301+
// Successfully sent the log record to the data channel.
302+
// Increment the current batch size and check if it has reached
303+
// the max export batch size.
304+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1
305+
>= self.max_export_batch_size
306+
{
307+
// Check if the a control message for exporting logs is
308+
// already sent to the worker thread. If not, send a control
309+
// message to export logs. `export_log_message_sent` is set
310+
// to false ONLY when the worker thread has processed the
311+
// control message.
312+
313+
if !self.export_log_message_sent.load(Ordering::Relaxed) {
314+
// This is a cost-efficient check as atomic load
315+
// operations do not require exclusive access to cache
316+
// line. Perform atomic swap to
317+
// `export_log_message_sent` ONLY when the atomic load
318+
// operation above returns false. Atomic
319+
// swap/compare_exchange operations require exclusive
320+
// access to cache line on most processor architectures.
321+
// We could have used compare_exchange as well here, but
322+
// it's more verbose than swap.
323+
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
324+
match self.message_sender.try_send(BatchMessage::ExportLog(
325+
self.export_log_message_sent.clone(),
326+
)) {
327+
Ok(_) => {
328+
// Control message sent successfully.
329+
}
330+
Err(_err) => {
331+
// TODO: Log error If the control message
332+
// could not be sent, reset the
333+
// `export_log_message_sent` flag.
334+
self.export_log_message_sent.store(false, Ordering::Relaxed);
335+
}
336+
}
342337
}
343338
}
344339
}
345340
}
341+
Err(mpsc::TrySendError::Full(_)) => {
342+
// Increment dropped logs count. The first time we have to drop
343+
// a log, emit a warning.
344+
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
345+
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
346+
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
347+
}
348+
return;
349+
}
350+
Err(mpsc::TrySendError::Disconnected(_)) => {
351+
// Given background thread is the only receiver, and it's
352+
// disconnected, it indicates the thread is shutdown
353+
otel_warn!(
354+
name: "BatchLogProcessor.Emit.AfterShutdown",
355+
message = "Logs are being emitted even after Shutdown. This indicates incorrect lifecycle management of OTelLoggerProvider in application. Logs will not be exported."
356+
);
357+
return;
358+
}
346359
}
347360
}
348361

349362
fn force_flush(&self) -> LogResult<()> {
350-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
351-
return LogResult::Err(LogError::Other(
352-
"BatchLogProcessor is already shutdown".into(),
353-
));
354-
}
355363
let (sender, receiver) = mpsc::sync_channel(1);
356-
self.message_sender
364+
match self
365+
.message_sender
357366
.try_send(BatchMessage::ForceFlush(sender))
358-
.map_err(|err| LogError::Other(err.into()))?;
359-
360-
receiver
361-
.recv_timeout(self.forceflush_timeout)
362-
.map_err(|err| {
363-
if err == RecvTimeoutError::Timeout {
364-
LogError::ExportTimedOut(self.forceflush_timeout)
365-
} else {
366-
LogError::Other(err.into())
367-
}
368-
})?
367+
{
368+
Ok(_) => receiver
369+
.recv_timeout(self.forceflush_timeout)
370+
.map_err(|err| {
371+
if err == RecvTimeoutError::Timeout {
372+
LogError::ExportTimedOut(self.forceflush_timeout)
373+
} else {
374+
LogError::Other(err.into())
375+
}
376+
})?,
377+
Err(mpsc::TrySendError::Full(_)) => {
378+
// If the control message could not be sent, emit a warning.
379+
otel_debug!(
380+
name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
381+
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
382+
);
383+
LogResult::Err(LogError::Other("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))
384+
}
385+
Err(mpsc::TrySendError::Disconnected(_)) => {
386+
// Given background thread is the only receiver, and it's
387+
// disconnected, it indicates the thread is shutdown
388+
otel_debug!(
389+
name: "BatchLogProcessor.ForceFlush.AlreadyShutdown",
390+
message = "ForceFlush invoked after Shutdown. This will not perform Flush and indicates a incorrect lifecycle management in Application."
391+
);
392+
393+
LogResult::Err(LogError::Other(
394+
"ForceFlush cannot be performed as BatchLogProcessor is already shutdown"
395+
.into(),
396+
))
397+
}
398+
}
369399
}
370400

371401
fn shutdown(&self) -> LogResult<()> {
372-
// Set is_shutdown to true
373-
self.is_shutdown
374-
.store(true, std::sync::atomic::Ordering::Relaxed);
375-
376402
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
377403
let max_queue_size = self.max_queue_size;
378404
if dropped_logs > 0 {
@@ -385,35 +411,56 @@ impl LogProcessor for BatchLogProcessor {
385411
}
386412

387413
let (sender, receiver) = mpsc::sync_channel(1);
388-
self.message_sender
389-
.try_send(BatchMessage::Shutdown(sender))
390-
.map_err(|err| LogError::Other(err.into()))?;
391-
392-
receiver
393-
.recv_timeout(self.shutdown_timeout)
394-
.map(|_| {
395-
// join the background thread after receiving back the shutdown signal
396-
if let Some(handle) = self.handle.lock().unwrap().take() {
397-
handle.join().unwrap();
398-
}
399-
LogResult::Ok(())
400-
})
401-
.map_err(|err| match err {
402-
RecvTimeoutError::Timeout => {
403-
otel_error!(
404-
name: "BatchLogProcessor.Shutdown.Timeout",
405-
message = "BatchLogProcessor shutdown timing out."
406-
);
407-
LogError::ExportTimedOut(self.shutdown_timeout)
408-
}
409-
_ => {
410-
otel_error!(
411-
name: "BatchLogProcessor.Shutdown.Error",
412-
error = format!("{}", err)
413-
);
414-
LogError::Other(err.into())
415-
}
416-
})?
414+
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
415+
Ok(_) => {
416+
receiver
417+
.recv_timeout(self.shutdown_timeout)
418+
.map(|_| {
419+
// join the background thread after receiving back the
420+
// shutdown signal
421+
if let Some(handle) = self.handle.lock().unwrap().take() {
422+
handle.join().unwrap();
423+
}
424+
LogResult::Ok(())
425+
})
426+
.map_err(|err| match err {
427+
RecvTimeoutError::Timeout => {
428+
otel_error!(
429+
name: "BatchLogProcessor.Shutdown.Timeout",
430+
message = "BatchLogProcessor shutdown timing out."
431+
);
432+
LogError::ExportTimedOut(self.shutdown_timeout)
433+
}
434+
_ => {
435+
otel_error!(
436+
name: "BatchLogProcessor.Shutdown.Error",
437+
error = format!("{}", err)
438+
);
439+
LogError::Other(err.into())
440+
}
441+
})?
442+
}
443+
Err(mpsc::TrySendError::Full(_)) => {
444+
// If the control message could not be sent, emit a warning.
445+
otel_debug!(
446+
name: "BatchLogProcessor.Shutdown.ControlChannelFull",
447+
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
448+
);
449+
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))
450+
}
451+
Err(mpsc::TrySendError::Disconnected(_)) => {
452+
// Given background thread is the only receiver, and it's
453+
// disconnected, it indicates the thread is shutdown
454+
otel_debug!(
455+
name: "BatchLogProcessor.Shutdown.AlreadyShutdown",
456+
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
457+
);
458+
459+
LogResult::Err(LogError::Other(
460+
"BatchLogProcessor is already shutdown".into(),
461+
))
462+
}
463+
}
417464
}
418465

419466
fn set_resource(&self, resource: &Resource) {
@@ -590,7 +637,6 @@ impl BatchLogProcessor {
590637
handle: Mutex::new(Some(handle)),
591638
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
592639
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
593-
is_shutdown: AtomicBool::new(false),
594640
dropped_logs_count: AtomicUsize::new(0),
595641
max_queue_size,
596642
export_log_message_sent: Arc::new(AtomicBool::new(false)),

0 commit comments

Comments
 (0)