Skip to content

Commit 082972d

Browse files
committed
Check shutdown flag
1 parent d5042e5 commit 082972d

File tree

1 file changed

+23
-4
lines changed

1 file changed

+23
-4
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,13 @@ impl LogProcessor for SimpleLogProcessor {
163163
}
164164
}
165165

166-
/// A [`LogProcessor`] that asynchronously buffers log records and reports
167-
/// them at a pre-configured interval.
166+
/// A [`LogProcessor`] that buffers log records and reports them at a pre-configured interval.
168167
pub struct BatchLogProcessor {
169168
sender: SyncSender<BatchMessage>,
170169
handle: Mutex<Option<thread::JoinHandle<()>>>,
171170
forceflush_timeout: Duration,
172171
shutdown_timeout: Duration,
172+
is_shutdown: AtomicBool,
173173
}
174174

175175
impl Debug for BatchLogProcessor {
@@ -182,6 +182,14 @@ impl Debug for BatchLogProcessor {
182182

183183
impl LogProcessor for BatchLogProcessor {
184184
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
185+
// noop after shutdown
186+
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
187+
otel_warn!(
188+
name: "batch_log_processor_emit_after_shutdown"
189+
);
190+
return;
191+
}
192+
185193
let result = self.sender.send(BatchMessage::ExportLog((
186194
record.clone(),
187195
instrumentation.clone(),
@@ -192,11 +200,17 @@ impl LogProcessor for BatchLogProcessor {
192200
name: "batch_log_processor_emit_error",
193201
error = format!("{:?}", err)
194202
);
195-
global::handle_error(LogError::Other(err.into()));
196203
}
197204
}
198205

199206
fn force_flush(&self) -> LogResult<()> {
207+
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
208+
otel_warn!(
209+
name: "batch_log_processor_force_flush_after_shutdown"
210+
);
211+
return LogResult::Err(LogError::Other("batch log processor is already shutdown".into()));
212+
}
213+
200214
let (sender, receiver) = mpsc::sync_channel(1);
201215
self.sender.try_send(BatchMessage::ForceFlush(sender))
202216
.map_err(|err| LogError::Other(err.into()))?;
@@ -211,6 +225,11 @@ impl LogProcessor for BatchLogProcessor {
211225
}
212226

213227
fn shutdown(&self) -> LogResult<()> {
228+
// test and set is_shutdown flag is it is not set.
229+
if self.is_shutdown.swap(true, std::sync::atomic::Ordering::Relaxed) {
230+
return Ok(());
231+
}
232+
214233
let (sender, receiver) = mpsc::sync_channel(1);
215234
self.sender.try_send(BatchMessage::Shutdown(sender))
216235
.map_err(|err| LogError::Other(err.into()))?;
@@ -297,7 +316,7 @@ impl BatchLogProcessor {
297316
.unwrap_or(OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT);
298317

299318
// Return batch processor with link to worker
300-
BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout }
319+
BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout, is_shutdown: AtomicBool::new(false) }
301320
}
302321

303322
/// Create a new batch processor builder

0 commit comments

Comments
 (0)