Skip to content

Commit 2664e0f

Browse files
committed
.
1 parent 9a18717 commit 2664e0f

File tree

6 files changed

+43
-38
lines changed

6 files changed

+43
-38
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
//! ```
3333
3434
use crate::{
35-
error::ShutdownResult, logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult}, Resource
35+
error::{ShutdownError, ShutdownResult},
36+
logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult},
37+
Resource,
3638
};
3739
use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
3840

@@ -92,7 +94,7 @@ pub trait LogProcessor: Send + Sync + Debug {
9294
/// After shutdown returns the log processor should stop processing any logs.
9395
/// It's up to the implementation on when to drop the LogProcessor.
9496
fn shutdown(&self) -> ShutdownResult;
95-
97+
9698
#[cfg(feature = "spec_unstable_logs_enabled")]
9799
/// Check if logging is enabled
98100
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
@@ -191,14 +193,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
191193
fn shutdown(&self) -> ShutdownResult {
192194
self.is_shutdown
193195
.store(true, std::sync::atomic::Ordering::Relaxed);
194-
if let Ok(mut exporter) = self.exporter.lock() {
195-
exporter
196-
.shutdown()
197-
.map_err(|e| LogError:(Box::new(e)))?;
198-
Ok(())
199-
} else {
200-
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
201-
}
196+
self.exporter
197+
.lock()
198+
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
199+
.shutdown()?;
200+
Ok(())
202201
}
203202

204203
fn set_resource(&self, resource: &Resource) {
@@ -412,7 +411,7 @@ impl LogProcessor for BatchLogProcessor {
412411
}
413412
}
414413

415-
fn shutdown(&self) -> LogResult<()> {
414+
fn shutdown(&self) -> ShutdownResult {
416415
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
417416
let max_queue_size = self.max_queue_size;
418417
if dropped_logs > 0 {
@@ -435,22 +434,22 @@ impl LogProcessor for BatchLogProcessor {
435434
if let Some(handle) = self.handle.lock().unwrap().take() {
436435
handle.join().unwrap();
437436
}
438-
LogResult::Ok(())
437+
Ok(())
439438
})
440439
.map_err(|err| match err {
441440
RecvTimeoutError::Timeout => {
442441
otel_error!(
443442
name: "BatchLogProcessor.Shutdown.Timeout",
444443
message = "BatchLogProcessor shutdown timing out."
445444
);
446-
LogError::ExportTimedOut(self.shutdown_timeout)
445+
ShutdownError::Timeout(self.shutdown_timeout)
447446
}
448447
_ => {
449448
otel_error!(
450449
name: "BatchLogProcessor.Shutdown.Error",
451450
error = format!("{}", err)
452451
);
453-
LogError::Other(err.into())
452+
ShutdownError::InternalFailure(err.to_string())
454453
}
455454
})?
456455
}
@@ -460,7 +459,7 @@ impl LogProcessor for BatchLogProcessor {
460459
name: "BatchLogProcessor.Shutdown.ControlChannelFull",
461460
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
462461
);
463-
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
462+
Err(ShutdownError::InternalFailure("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
464463
}
465464
Err(mpsc::TrySendError::Disconnected(_)) => {
466465
// Given background thread is the only receiver, and it's
@@ -470,7 +469,7 @@ impl LogProcessor for BatchLogProcessor {
470469
message = "Shutdown is being invoked more than once. This is noop, but indicates a potential issue in the application's lifecycle management."
471470
);
472471

473-
LogResult::Err(LogError::Other(
472+
Err(ShutdownError::InternalFailure(
474473
"BatchLogProcessor is already shutdown".into(),
475474
))
476475
}
@@ -1237,7 +1236,7 @@ mod tests {
12371236
Ok(())
12381237
}
12391238

1240-
fn shutdown(&self) -> LogResult<()> {
1239+
fn shutdown(&self) -> ShutdownResult {
12411240
Ok(())
12421241
}
12431242
}
@@ -1267,7 +1266,7 @@ mod tests {
12671266
Ok(())
12681267
}
12691268

1270-
fn shutdown(&self) -> LogResult<()> {
1269+
fn shutdown(&self) -> ShutdownResult {
12711270
Ok(())
12721271
}
12731272
}

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
error::{ShutdownError, ShutdownResult},
23
logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult},
34
Resource,
45
};
@@ -86,7 +87,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
8687
.and_then(std::convert::identity)
8788
}
8889

89-
fn shutdown(&self) -> LogResult<()> {
90+
fn shutdown(&self) -> ShutdownResult {
9091
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
9192
let max_queue_size = self.max_queue_size;
9293
if dropped_logs > 0 {
@@ -100,10 +101,10 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
100101
let (res_sender, res_receiver) = oneshot::channel();
101102
self.message_sender
102103
.try_send(BatchMessage::Shutdown(res_sender))
103-
.map_err(|err| LogError::Other(err.into()))?;
104+
.map_err(|err| ShutdownError::InternalFailure(err.to_string()))?;
104105

105106
futures_executor::block_on(res_receiver)
106-
.map_err(|err| LogError::Other(err.into()))
107+
.map_err(|err| ShutdownError::InternalFailure(err.to_string()))
107108
.and_then(std::convert::identity)
108109
}
109110

@@ -655,7 +656,7 @@ mod tests {
655656
Ok(())
656657
}
657658

658-
fn shutdown(&self) -> LogResult<()> {
659+
fn shutdown(&self) -> ShutdownResult {
659660
Ok(())
660661
}
661662
}
@@ -685,7 +686,7 @@ mod tests {
685686
Ok(())
686687
}
687688

688-
fn shutdown(&self) -> LogResult<()> {
689+
fn shutdown(&self) -> ShutdownResult {
689690
Ok(())
690691
}
691692
}

opentelemetry-sdk/src/logs/logger_provider.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
2+
use crate::error::ShutdownError;
23
use crate::logs::{LogError, LogExporter, LogResult};
34
use crate::Resource;
45
use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope};
@@ -131,7 +132,7 @@ struct LoggerProviderInner {
131132

132133
impl LoggerProviderInner {
133134
/// Shuts down the `LoggerProviderInner` and returns any errors.
134-
pub(crate) fn shutdown(&self) -> Vec<LogError> {
135+
pub(crate) fn shutdown(&self) -> Vec<ShutdownError> {
135136
let mut errs = vec![];
136137
for processor in &self.processors {
137138
if let Err(err) = processor.shutdown() {
@@ -141,9 +142,15 @@ impl LoggerProviderInner {
141142
// which is non-actionable by the user
142143
match err {
143144
// specific handling for mutex poisoning
144-
LogError::MutexPoisoned(_) => {
145+
ShutdownError::AlreadyShutdown => {
145146
otel_debug!(
146-
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
147+
name: "LoggerProvider.Drop.AlreadyShutdown",
148+
);
149+
}
150+
ShutdownError::Timeout(t) => {
151+
otel_debug!(
152+
name: "LoggerProvider.Drop.ExportTimedOut",
153+
error = format!("timed out shutting down; t={:?}", t)
147154
);
148155
}
149156
_ => {
@@ -333,6 +340,7 @@ impl opentelemetry::logs::Logger for Logger {
333340
#[cfg(test)]
334341
mod tests {
335342
use crate::{
343+
error::ShutdownResult,
336344
logs::InMemoryLogExporter,
337345
resource::{
338346
SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
@@ -387,7 +395,7 @@ mod tests {
387395
Ok(())
388396
}
389397

390-
fn shutdown(&self) -> LogResult<()> {
398+
fn shutdown(&self) -> ShutdownResult {
391399
self.is_shutdown
392400
.lock()
393401
.map(|mut is_shutdown| *is_shutdown = true)
@@ -800,7 +808,7 @@ mod tests {
800808
Ok(())
801809
}
802810

803-
fn shutdown(&self) -> LogResult<()> {
811+
fn shutdown(&self) -> ShutdownResult {
804812
*self.shutdown_called.lock().unwrap() = true;
805813
Ok(())
806814
}
@@ -831,7 +839,7 @@ mod tests {
831839
Ok(())
832840
}
833841

834-
fn shutdown(&self) -> LogResult<()> {
842+
fn shutdown(&self) -> ShutdownResult {
835843
let mut count = self.shutdown_count.lock().unwrap();
836844
*count += 1;
837845
Ok(())

opentelemetry-sdk/src/testing/trace/span_exporters.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use crate::{
2-
trace::{ExportResult, SpanData, SpanExporter},
3-
trace::{SpanEvents, SpanLinks},
4-
};
1+
use crate::trace::{ExportResult, SpanData, SpanEvents, SpanExporter, SpanLinks};
52
use futures_util::future::BoxFuture;
63
pub use opentelemetry::testing::trace::TestSpan;
74
use opentelemetry::{

opentelemetry-sdk/src/trace/export.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//! Trace exporters
2-
use crate::error::ShutdownResult;
32
use crate::Resource;
43
use futures_util::future::BoxFuture;
54
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError};
@@ -44,9 +43,7 @@ pub trait SpanExporter: Send + Sync + Debug {
4443
/// flush the data and the destination is unavailable). SDK authors
4544
/// can decide if they want to make the shutdown timeout
4645
/// configurable.
47-
fn shutdown(&mut self) -> ShutdownResult {
48-
Ok(())
49-
}
46+
fn shutdown(&mut self) {}
5047

5148
/// This is a hint to ensure that the export of any Spans the exporter
5249
/// has received prior to the call to this function SHOULD be completed

stress/src/logs.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
~40 M /sec
1010
*/
1111

12+
use std::ascii::AsciiExt;
13+
1214
use opentelemetry::InstrumentationScope;
1315
use opentelemetry_appender_tracing::layer;
16+
use opentelemetry_sdk::error::ShutdownResult;
1417
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
1518
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider};
1619

@@ -46,7 +49,7 @@ impl LogProcessor for MockLogProcessor {
4649
Ok(())
4750
}
4851

49-
fn shutdown(&self) -> LogResult<()> {
52+
fn shutdown(&self) -> ShutdownResult {
5053
Ok(())
5154
}
5255
}

0 commit comments

Comments
 (0)