Skip to content

Commit 999146b

Browse files
committed
SimpleProcessor for Logs simplified
1 parent 9569b0f commit 999146b

File tree

2 files changed

+39
-60
lines changed

2 files changed

+39
-60
lines changed

opentelemetry-sdk/src/logs/in_memory_exporter.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ impl InMemoryLogExporter {
198198

199199
impl LogExporter for InMemoryLogExporter {
200200
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
201+
if self.is_shutdown_called() {
202+
return Err(OTelSdkError::AlreadyShutdown);
203+
}
201204
let mut logs_guard = self.logs.lock().map_err(|e| {
202205
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e))
203206
})?;

opentelemetry-sdk/src/logs/simple_log_processor.rs

Lines changed: 36 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,17 @@
1616
//! +-----+---------------+ +-----------------------+ +-------------------+
1717
//! ```
1818
19-
use crate::error::{OTelSdkError, OTelSdkResult};
19+
use crate::error::OTelSdkResult;
2020
use crate::logs::log_processor::LogProcessor;
2121
use crate::{
2222
logs::{LogBatch, LogExporter, SdkLogRecord},
2323
Resource,
2424
};
2525

26-
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
26+
use opentelemetry::{otel_warn, InstrumentationScope};
2727

2828
use std::fmt::Debug;
29-
use std::sync::atomic::AtomicBool;
30-
use std::sync::Mutex;
29+
use std::sync::atomic::{AtomicBool, Ordering};
3130

3231
/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
3332
/// exports log records as they are emitted. Log records are exported synchronously
@@ -60,54 +59,47 @@ use std::sync::Mutex;
6059
///
6160
#[derive(Debug)]
6261
pub struct SimpleLogProcessor<T: LogExporter> {
63-
exporter: Mutex<T>,
64-
is_shutdown: AtomicBool,
62+
exporter: T,
63+
is_exporting: AtomicBool,
6564
}
6665

6766
impl<T: LogExporter> SimpleLogProcessor<T> {
6867
/// Creates a new instance of `SimpleLogProcessor`.
6968
pub fn new(exporter: T) -> Self {
7069
SimpleLogProcessor {
71-
exporter: Mutex::new(exporter),
72-
is_shutdown: AtomicBool::new(false),
70+
exporter,
71+
is_exporting: AtomicBool::new(false),
7372
}
7473
}
7574
}
7675

7776
impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
7877
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
79-
// noop after shutdown
80-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
81-
// this is a warning, as the user is trying to log after the processor has been shutdown
82-
otel_warn!(
83-
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
84-
);
85-
return;
78+
// export() does not require mutable self and can be called in parallel
79+
// with other export() calls. However, OTel Spec requires that
80+
// existing export() must be completed before the next export() call.
81+
while !self
82+
.is_exporting
83+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
84+
.is_ok()
85+
{
86+
// Another thread is currently exporting, yield to let other work proceed
87+
std::thread::yield_now();
8688
}
8789

88-
let result = self
89-
.exporter
90-
.lock()
91-
.map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
92-
.and_then(|exporter| {
93-
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
94-
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
95-
});
96-
// Handle errors with specific static names
97-
match result {
98-
Err(OTelSdkError::InternalFailure(_)) => {
99-
// logging as debug as this is not a user error
100-
otel_debug!(
101-
name: "SimpleLogProcessor.Emit.MutexPoisoning",
102-
);
103-
}
104-
Err(err) => {
105-
otel_error!(
106-
name: "SimpleLogProcessor.Emit.ExportError",
107-
error = format!("{}",err)
108-
);
109-
}
110-
_ => {}
90+
// We now have exclusive access to export
91+
let result = {
92+
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
93+
futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)))
94+
};
95+
96+
// Release the lock
97+
self.is_exporting.store(false, Ordering::Release);
98+
if let Err(err) = result {
99+
otel_warn!(
100+
name: "SimpleLogProcessor.Emit.ExportError",
101+
error = format!("{}",err)
102+
);
111103
}
112104
}
113105

@@ -116,21 +108,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
116108
}
117109

118110
fn shutdown(&self) -> OTelSdkResult {
119-
self.is_shutdown
120-
.store(true, std::sync::atomic::Ordering::Relaxed);
121-
if let Ok(exporter) = self.exporter.lock() {
122-
exporter.shutdown()
123-
} else {
124-
Err(OTelSdkError::InternalFailure(
125-
"SimpleLogProcessor mutex poison at shutdown".into(),
126-
))
127-
}
111+
self.exporter.shutdown()
128112
}
129113

130114
fn set_resource(&mut self, resource: &Resource) {
131-
if let Ok(mut exporter) = self.exporter.lock() {
132-
exporter.set_resource(resource);
133-
}
115+
self.exporter.set_resource(resource);
134116
}
135117

136118
#[cfg(feature = "spec_unstable_logs_enabled")]
@@ -140,11 +122,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
140122
target: &str,
141123
name: Option<&str>,
142124
) -> bool {
143-
if let Ok(exporter) = self.exporter.lock() {
144-
exporter.event_enabled(level, target, name)
145-
} else {
146-
true
147-
}
125+
self.exporter.event_enabled(level, target, name)
148126
}
149127
}
150128

@@ -231,13 +209,11 @@ mod tests {
231209

232210
processor.shutdown().unwrap();
233211

234-
let is_shutdown = processor
235-
.is_shutdown
236-
.load(std::sync::atomic::Ordering::Relaxed);
237-
assert!(is_shutdown);
238-
239212
processor.emit(&mut record, &instrumentation);
240213

214+
// Emit was called after shutdown. While SimpleLogProcessor
215+
// does not care, the exporter in this case does,
216+
// and it ignores the export() calls after shutdown.
241217
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
242218
assert!(exporter.is_shutdown_called());
243219
}

0 commit comments

Comments
 (0)