Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Bump MSRV to 1.70 [#2179](https://github.com/open-telemetry/opentelemetry-rust/pull/2179)
- Implement `LogRecord::set_trace_context` for `LogRecord`. Respect any trace context set on a `LogRecord` when emitting through a `Logger`.
- Improved `LoggerProvider` shutdown handling to prevent redundant shutdown calls when `drop` is invoked. [#2195](https://github.com/open-telemetry/opentelemetry-rust/pull/2195)

## v0.26.0
Released 2024-Sep-30
Expand Down
165 changes: 147 additions & 18 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, Trac
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::otel_warn;
use opentelemetry::{
global,
logs::{LogError, LogResult},
trace::TraceContextExt,
Context, InstrumentationLibrary,
Expand All @@ -24,15 +23,26 @@ static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider
inner: Arc::new(LoggerProviderInner {
processors: Vec::new(),
resource: Resource::empty(),
is_shutdown: AtomicBool::new(true),
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

#[derive(Debug, Clone)]
/// Creator for `Logger` instances.
/// Handles the creation and coordination of [`Logger`]s.
///
/// All `Logger`s created by a `LoggerProvider` will share the same
/// [`Resource`] and have their created log records processed by the
/// configured log processors. This is a clonable handle to the `LoggerProvider`
/// itself, and cloning it will create a new reference, not a new instance of a
/// `LoggerProvider`. Dropping the last reference will trigger the shutdown of
/// the provider, ensuring that all remaining logs are flushed and no further
/// logs are processed. Shutdown can also be triggered manually by calling
/// the [`shutdown`](LoggerProvider::shutdown) method.
///
/// [`Logger`]: opentelemetry::logs::Logger
/// [`Resource`]: crate::Resource
pub struct LoggerProvider {
inner: Arc<LoggerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

/// Default logger name if empty string is provided.
Expand Down Expand Up @@ -73,7 +83,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
Expand Down Expand Up @@ -104,6 +114,19 @@ impl LoggerProvider {

/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> LogResult<()> {
self.inner.shutdown()
}
}

#[derive(Debug)]
struct LoggerProviderInner {
processors: Vec<Box<dyn LogProcessor>>,
resource: Resource,
is_shutdown: AtomicBool,
}

impl LoggerProviderInner {
pub(crate) fn shutdown(&self) -> LogResult<()> {
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
Expand All @@ -112,7 +135,7 @@ impl LoggerProvider {
// propagate the shutdown signal to processors
// it's up to the processor to properly block new logs after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
otel_warn!(
name: "logger_provider_shutdown_error",
Expand All @@ -136,19 +159,10 @@ impl LoggerProvider {
}
}

#[derive(Debug)]
struct LoggerProviderInner {
processors: Vec<Box<dyn LogProcessor>>,
resource: Resource,
}

impl Drop for LoggerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
}
}
let _ = self.shutdown();
//TODO - handle error during shutdown
}
}

Expand Down Expand Up @@ -202,8 +216,8 @@ impl Builder {
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
resource,
is_shutdown: AtomicBool::new(false),
}),
is_shutdown: Arc::new(AtomicBool::new(false)),
};

// invoke set_resource on all the processors
Expand Down Expand Up @@ -612,6 +626,89 @@ mod tests {
assert!(!*flush_called.lock().unwrap());
}

#[test]
fn drop_test_with_multiple_providers() {
let shutdown_called = Arc::new(Mutex::new(false));
let flush_called = Arc::new(Mutex::new(false));
{
// Create a shared LoggerProviderInner and use it across multiple providers
let shared_inner = Arc::new(LoggerProviderInner {
processors: vec![Box::new(LazyLogProcessor::new(
shutdown_called.clone(),
flush_called.clone(),
))],
resource: Resource::empty(),
is_shutdown: AtomicBool::new(false),
});

{
let logger_provider1 = LoggerProvider {
inner: shared_inner.clone(),
};
let logger_provider2 = LoggerProvider {
inner: shared_inner.clone(),
};

let logger1 = logger_provider1.logger("test-logger1");
let logger2 = logger_provider2.logger("test-logger2");

logger1.emit(logger1.create_log_record());
logger2.emit(logger1.create_log_record());

// LoggerProviderInner should not be dropped yet, since both providers and `shared_inner`
// are still holding a reference.
}
// At this point, both `logger_provider1` and `logger_provider2` are dropped,
// but `shared_inner` still holds a reference, so `LoggerProviderInner` is NOT dropped yet.
}
// Verify shutdown was called during the drop of the shared LoggerProviderInner
assert!(*shutdown_called.lock().unwrap());
// Verify flush was not called during drop
assert!(!*flush_called.lock().unwrap());
}

#[test]
fn drop_after_shutdown_test_with_multiple_providers() {
let shutdown_called = Arc::new(Mutex::new(0)); // Count the number of times shutdown is called
let flush_called = Arc::new(Mutex::new(false));

// Create a shared LoggerProviderInner and use it across multiple providers
let shared_inner = Arc::new(LoggerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_called.clone(),
flush_called.clone(),
))],
resource: Resource::empty(),
is_shutdown: AtomicBool::new(false),
});

// Create a scope to test behavior when providers are dropped
{
let logger_provider1 = LoggerProvider {
inner: shared_inner.clone(),
};
let logger_provider2 = LoggerProvider {
inner: shared_inner.clone(),
};

// Explicitly shut down the logger provider
let shutdown_result = logger_provider1.shutdown();
assert!(shutdown_result.is_ok());

// Verify that shutdown was called exactly once
assert_eq!(*shutdown_called.lock().unwrap(), 1);

// LoggerProvider2 should observe the shutdown state but not trigger another shutdown
let shutdown_result2 = logger_provider2.shutdown();
assert!(shutdown_result2.is_err());

// Both logger providers will be dropped at the end of this scope
}

// Verify that shutdown was only called once, even after drop
assert_eq!(*shutdown_called.lock().unwrap(), 1);
}

#[derive(Debug)]
pub(crate) struct LazyLogProcessor {
shutdown_called: Arc<Mutex<bool>>,
Expand Down Expand Up @@ -645,4 +742,36 @@ mod tests {
Ok(())
}
}

#[derive(Debug)]
struct CountingShutdownProcessor {
shutdown_count: Arc<Mutex<i32>>,
flush_called: Arc<Mutex<bool>>,
}

impl CountingShutdownProcessor {
fn new(shutdown_count: Arc<Mutex<i32>>, flush_called: Arc<Mutex<bool>>) -> Self {
CountingShutdownProcessor {
shutdown_count,
flush_called,
}
}
}

impl LogProcessor for CountingShutdownProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
// nothing to do
}

fn force_flush(&self) -> LogResult<()> {
*self.flush_called.lock().unwrap() = true;
Ok(())
}

fn shutdown(&self) -> LogResult<()> {
let mut count = self.shutdown_count.lock().unwrap();
*count += 1;
Ok(())
}
}
}