Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Bump MSRV to 1.70 [#2179](https://github.com/open-telemetry/opentelemetry-rust/pull/2179)
- Implement `LogRecord::set_trace_context` for `LogRecord`. Respect any trace context set on a `LogRecord` when emitting through a `Logger`.
- Improved `LoggerProvider` shutdown handling to prevent redundant shutdown calls when `drop` is invoked. [#2195](https://github.com/open-telemetry/opentelemetry-rust/pull/2195)

## v0.26.0
Released 2024-Sep-30
Expand Down
136 changes: 129 additions & 7 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider
inner: Arc::new(LoggerProviderInner {
processors: Vec::new(),
resource: Resource::empty(),
is_shutdown: AtomicBool::new(true),
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

#[derive(Debug, Clone)]
/// Creator for `Logger` instances.
pub struct LoggerProvider {
inner: Arc<LoggerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

/// Default logger name if empty string is provided.
Expand Down Expand Up @@ -73,7 +72,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
Expand Down Expand Up @@ -105,6 +104,7 @@ impl LoggerProvider {
/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> LogResult<()> {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
Expand Down Expand Up @@ -140,13 +140,20 @@ impl LoggerProvider {
struct LoggerProviderInner {
processors: Vec<Box<dyn LogProcessor>>,
resource: Resource,
is_shutdown: AtomicBool,
}

impl Drop for LoggerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
for processor in &mut self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
}
}
}
}
Expand Down Expand Up @@ -202,8 +209,8 @@ impl Builder {
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
resource,
is_shutdown: AtomicBool::new(false),
}),
is_shutdown: Arc::new(AtomicBool::new(false)),
};

// invoke set_resource on all the processors
Expand Down Expand Up @@ -612,6 +619,89 @@ mod tests {
assert!(!*flush_called.lock().unwrap());
}

#[test]
fn drop_test_with_multiple_providers() {
let shutdown_called = Arc::new(Mutex::new(false));
let flush_called = Arc::new(Mutex::new(false));
{
// Create a shared LoggerProviderInner and use it across multiple providers
let shared_inner = Arc::new(LoggerProviderInner {
processors: vec![Box::new(LazyLogProcessor::new(
shutdown_called.clone(),
flush_called.clone(),
))],
resource: Resource::empty(),
is_shutdown: AtomicBool::new(false),
});

{
let logger_provider1 = LoggerProvider {
inner: shared_inner.clone(),
};
let logger_provider2 = LoggerProvider {
inner: shared_inner.clone(),
};

let logger1 = logger_provider1.logger("test-logger1");
let logger2 = logger_provider2.logger("test-logger2");

logger1.emit(logger1.create_log_record());
logger2.emit(logger1.create_log_record());

// LoggerProviderInner should not be dropped yet, since both providers and `shared_inner`
// are still holding a reference.
}
// At this point, both `logger_provider1` and `logger_provider2` are dropped,
// but `shared_inner` still holds a reference, so `LoggerProviderInner` is NOT dropped yet.
}
// Verify shutdown was called during the drop of the shared LoggerProviderInner
assert!(*shutdown_called.lock().unwrap());
// Verify flush was not called during drop
assert!(!*flush_called.lock().unwrap());
}

#[test]
fn drop_after_shutdown_test_with_multiple_providers() {
let shutdown_called = Arc::new(Mutex::new(0)); // Count the number of times shutdown is called
let flush_called = Arc::new(Mutex::new(false));

// Create a shared LoggerProviderInner and use it across multiple providers
let shared_inner = Arc::new(LoggerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_called.clone(),
flush_called.clone(),
))],
resource: Resource::empty(),
is_shutdown: AtomicBool::new(false),
});

// Create a scope to test behavior when providers are dropped
{
let logger_provider1 = LoggerProvider {
inner: shared_inner.clone(),
};
let logger_provider2 = LoggerProvider {
inner: shared_inner.clone(),
};

// Explicitly shut down the logger provider
let shutdown_result = logger_provider1.shutdown();
assert!(shutdown_result.is_ok());

// Verify that shutdown was called exactly once
assert_eq!(*shutdown_called.lock().unwrap(), 1);

// LoggerProvider2 should observe the shutdown state but not trigger another shutdown
let shutdown_result2 = logger_provider2.shutdown();
assert!(shutdown_result2.is_err());

// Both logger providers will be dropped at the end of this scope
}

// Verify that shutdown was only called once, even after drop
assert_eq!(*shutdown_called.lock().unwrap(), 1);
}

#[derive(Debug)]
pub(crate) struct LazyLogProcessor {
shutdown_called: Arc<Mutex<bool>>,
Expand Down Expand Up @@ -645,4 +735,36 @@ mod tests {
Ok(())
}
}

#[derive(Debug)]
struct CountingShutdownProcessor {
shutdown_count: Arc<Mutex<i32>>,
flush_called: Arc<Mutex<bool>>,
}

impl CountingShutdownProcessor {
fn new(shutdown_count: Arc<Mutex<i32>>, flush_called: Arc<Mutex<bool>>) -> Self {
CountingShutdownProcessor {
shutdown_count,
flush_called,
}
}
}

impl LogProcessor for CountingShutdownProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
// nothing to do
}

fn force_flush(&self) -> LogResult<()> {
*self.flush_called.lock().unwrap() = true;
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
let mut count = self.shutdown_count.lock().unwrap();
*count += 1;
Ok(())
}
}
}