Skip to content

Commit 39f3127

Browse files
committed
.
1 parent 1a62863 commit 39f3127

File tree

6 files changed

+43
-38
lines changed

6 files changed

+43
-38
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
//! ```
3333
3434
use crate::{
35-
error::ShutdownResult, logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult}, Resource
35+
error::{ShutdownError, ShutdownResult},
36+
logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult},
37+
Resource,
3638
};
3739
use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
3840

@@ -90,7 +92,7 @@ pub trait LogProcessor: Send + Sync + Debug {
9092
/// After shutdown returns the log processor should stop processing any logs.
9193
/// It's up to the implementation on when to drop the LogProcessor.
9294
fn shutdown(&self) -> ShutdownResult;
93-
95+
9496
#[cfg(feature = "spec_unstable_logs_enabled")]
9597
/// Check if logging is enabled
9698
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
@@ -189,14 +191,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
189191
fn shutdown(&self) -> ShutdownResult {
190192
self.is_shutdown
191193
.store(true, std::sync::atomic::Ordering::Relaxed);
192-
if let Ok(mut exporter) = self.exporter.lock() {
193-
exporter
194-
.shutdown()
195-
.map_err(|e| LogError:(Box::new(e)))?;
196-
Ok(())
197-
} else {
198-
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
199-
}
194+
self.exporter
195+
.lock()
196+
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
197+
.shutdown()?;
198+
Ok(())
200199
}
201200

202201
fn set_resource(&self, resource: &Resource) {
@@ -398,7 +397,7 @@ impl LogProcessor for BatchLogProcessor {
398397
}
399398
}
400399

401-
fn shutdown(&self) -> LogResult<()> {
400+
fn shutdown(&self) -> ShutdownResult {
402401
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
403402
let max_queue_size = self.max_queue_size;
404403
if dropped_logs > 0 {
@@ -421,22 +420,22 @@ impl LogProcessor for BatchLogProcessor {
421420
if let Some(handle) = self.handle.lock().unwrap().take() {
422421
handle.join().unwrap();
423422
}
424-
LogResult::Ok(())
423+
Ok(())
425424
})
426425
.map_err(|err| match err {
427426
RecvTimeoutError::Timeout => {
428427
otel_error!(
429428
name: "BatchLogProcessor.Shutdown.Timeout",
430429
message = "BatchLogProcessor shutdown timing out."
431430
);
432-
LogError::ExportTimedOut(self.shutdown_timeout)
431+
ShutdownError::Timeout(self.shutdown_timeout)
433432
}
434433
_ => {
435434
otel_error!(
436435
name: "BatchLogProcessor.Shutdown.Error",
437436
error = format!("{}", err)
438437
);
439-
LogError::Other(err.into())
438+
ShutdownError::InternalFailure(err.to_string())
440439
}
441440
})?
442441
}
@@ -446,7 +445,7 @@ impl LogProcessor for BatchLogProcessor {
446445
name: "BatchLogProcessor.Shutdown.ControlChannelFull",
447446
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
448447
);
449-
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
448+
Err(ShutdownError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
450449
}
451450
Err(mpsc::TrySendError::Disconnected(_)) => {
452451
// Given background thread is the only receiver, and it's
@@ -456,7 +455,7 @@ impl LogProcessor for BatchLogProcessor {
456455
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
457456
);
458457

459-
LogResult::Err(LogError::Other(
458+
Err(ShutdownError::InternalFailure(
460459
"BatchLogProcessor is already shutdown".into(),
461460
))
462461
}
@@ -1217,7 +1216,7 @@ mod tests {
12171216
Ok(())
12181217
}
12191218

1220-
fn shutdown(&self) -> LogResult<()> {
1219+
fn shutdown(&self) -> ShutdownResult {
12211220
Ok(())
12221221
}
12231222
}
@@ -1247,7 +1246,7 @@ mod tests {
12471246
Ok(())
12481247
}
12491248

1250-
fn shutdown(&self) -> LogResult<()> {
1249+
fn shutdown(&self) -> ShutdownResult {
12511250
Ok(())
12521251
}
12531252
}

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
error::{ShutdownError, ShutdownResult},
23
logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult},
34
Resource,
45
};
@@ -86,7 +87,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
8687
.and_then(std::convert::identity)
8788
}
8889

89-
fn shutdown(&self) -> LogResult<()> {
90+
fn shutdown(&self) -> ShutdownResult {
9091
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
9192
let max_queue_size = self.max_queue_size;
9293
if dropped_logs > 0 {
@@ -100,10 +101,10 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
100101
let (res_sender, res_receiver) = oneshot::channel();
101102
self.message_sender
102103
.try_send(BatchMessage::Shutdown(res_sender))
103-
.map_err(|err| LogError::Other(err.into()))?;
104+
.map_err(|err| ShutdownError::InternalFailure(err.to_string()))?;
104105

105106
futures_executor::block_on(res_receiver)
106-
.map_err(|err| LogError::Other(err.into()))
107+
.map_err(|err| ShutdownError::InternalFailure(err.to_string()))
107108
.and_then(std::convert::identity)
108109
}
109110

@@ -655,7 +656,7 @@ mod tests {
655656
Ok(())
656657
}
657658

658-
fn shutdown(&self) -> LogResult<()> {
659+
fn shutdown(&self) -> ShutdownResult {
659660
Ok(())
660661
}
661662
}
@@ -685,7 +686,7 @@ mod tests {
685686
Ok(())
686687
}
687688

688-
fn shutdown(&self) -> LogResult<()> {
689+
fn shutdown(&self) -> ShutdownResult {
689690
Ok(())
690691
}
691692
}

opentelemetry-sdk/src/logs/logger_provider.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
2+
use crate::error::ShutdownError;
23
use crate::logs::{LogError, LogExporter, LogResult};
34
use crate::Resource;
45
use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope};
@@ -131,7 +132,7 @@ struct LoggerProviderInner {
131132

132133
impl LoggerProviderInner {
133134
/// Shuts down the `LoggerProviderInner` and returns any errors.
134-
pub(crate) fn shutdown(&self) -> Vec<LogError> {
135+
pub(crate) fn shutdown(&self) -> Vec<ShutdownError> {
135136
let mut errs = vec![];
136137
for processor in &self.processors {
137138
if let Err(err) = processor.shutdown() {
@@ -141,9 +142,15 @@ impl LoggerProviderInner {
141142
// which is non-actionable by the user
142143
match err {
143144
// specific handling for mutex poisoning
144-
LogError::MutexPoisoned(_) => {
145+
ShutdownError::AlreadyShutdown => {
145146
otel_debug!(
146-
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
147+
name: "LoggerProvider.Drop.AlreadyShutdown",
148+
);
149+
}
150+
ShutdownError::Timeout(t) => {
151+
otel_debug!(
152+
name: "LoggerProvider.Drop.ExportTimedOut",
153+
error = format!("timed out shutting down; t={:?}", t)
147154
);
148155
}
149156
_ => {
@@ -333,6 +340,7 @@ impl opentelemetry::logs::Logger for Logger {
333340
#[cfg(test)]
334341
mod tests {
335342
use crate::{
343+
error::ShutdownResult,
336344
logs::InMemoryLogExporter,
337345
resource::{
338346
SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
@@ -387,7 +395,7 @@ mod tests {
387395
Ok(())
388396
}
389397

390-
fn shutdown(&self) -> LogResult<()> {
398+
fn shutdown(&self) -> ShutdownResult {
391399
self.is_shutdown
392400
.lock()
393401
.map(|mut is_shutdown| *is_shutdown = true)
@@ -800,7 +808,7 @@ mod tests {
800808
Ok(())
801809
}
802810

803-
fn shutdown(&self) -> LogResult<()> {
811+
fn shutdown(&self) -> ShutdownResult {
804812
*self.shutdown_called.lock().unwrap() = true;
805813
Ok(())
806814
}
@@ -831,7 +839,7 @@ mod tests {
831839
Ok(())
832840
}
833841

834-
fn shutdown(&self) -> LogResult<()> {
842+
fn shutdown(&self) -> ShutdownResult {
835843
let mut count = self.shutdown_count.lock().unwrap();
836844
*count += 1;
837845
Ok(())

opentelemetry-sdk/src/testing/trace/span_exporters.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use crate::{
2-
trace::{ExportResult, SpanData, SpanExporter},
3-
trace::{SpanEvents, SpanLinks},
4-
};
1+
use crate::trace::{ExportResult, SpanData, SpanEvents, SpanExporter, SpanLinks};
52
use futures_util::future::BoxFuture;
63
pub use opentelemetry::testing::trace::TestSpan;
74
use opentelemetry::{

opentelemetry-sdk/src/trace/export.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//! Trace exporters
2-
use crate::error::ShutdownResult;
32
use crate::Resource;
43
use futures_util::future::BoxFuture;
54
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError};
@@ -44,9 +43,7 @@ pub trait SpanExporter: Send + Sync + Debug {
4443
/// flush the data and the destination is unavailable). SDK authors
4544
/// can decide if they want to make the shutdown timeout
4645
/// configurable.
47-
fn shutdown(&mut self) -> ShutdownResult {
48-
Ok(())
49-
}
46+
fn shutdown(&mut self) {}
5047

5148
/// This is a hint to ensure that the export of any Spans the exporter
5249
/// has received prior to the call to this function SHOULD be completed

stress/src/logs.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
~40 M /sec
1010
*/
1111

12+
use std::ascii::AsciiExt;
13+
1214
use opentelemetry::InstrumentationScope;
1315
use opentelemetry_appender_tracing::layer;
16+
use opentelemetry_sdk::error::ShutdownResult;
1417
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
1518
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider};
1619

@@ -46,7 +49,7 @@ impl LogProcessor for MockLogProcessor {
4649
Ok(())
4750
}
4851

49-
fn shutdown(&self) -> LogResult<()> {
52+
fn shutdown(&self) -> ShutdownResult {
5053
Ok(())
5154
}
5255
}

0 commit comments

Comments
 (0)