Skip to content

Commit 290a3ef

Browse files
add-shutdown-with-timeout-for-log-provider-and-processor
1 parent 5c60f12 commit 290a3ef

File tree

10 files changed

+32
-55
lines changed

10 files changed

+32
-55
lines changed

examples/tracing-http-propagator/src/server.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,6 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
120120
fn force_flush(&self) -> OTelSdkResult {
121121
Ok(())
122122
}
123-
124-
fn shutdown(&self) -> OTelSdkResult {
125-
Ok(())
126-
}
127123
}
128124

129125
/// A custom span processor that enriches spans with baggage attributes. Baggage

opentelemetry-appender-tracing/benches/log-attributes.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ impl LogProcessor for NoopProcessor {
4343
fn force_flush(&self) -> OTelSdkResult {
4444
Ok(())
4545
}
46-
47-
fn shutdown(&self) -> OTelSdkResult {
48-
Ok(())
49-
}
5046
}
5147

5248
/// Creates a single benchmark for a specific number of attributes

opentelemetry-appender-tracing/benches/logs.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,6 @@ impl LogProcessor for NoopProcessor {
5454
Ok(())
5555
}
5656

57-
fn shutdown(&self) -> OTelSdkResult {
58-
Ok(())
59-
}
60-
6157
fn event_enabled(
6258
&self,
6359
_level: opentelemetry::logs::Severity,

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -877,10 +877,6 @@ mod tests {
877877
fn force_flush(&self) -> OTelSdkResult {
878878
Ok(())
879879
}
880-
881-
fn shutdown(&self) -> OTelSdkResult {
882-
Ok(())
883-
}
884880
}
885881

886882
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-proto/src/transform/logs.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,6 @@ mod tests {
239239
fn force_flush(&self) -> OTelSdkResult {
240240
Ok(())
241241
}
242-
243-
fn shutdown(&self) -> OTelSdkResult {
244-
Ok(())
245-
}
246242
}
247243

248244
fn create_test_log_data(

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ pub struct BatchLogProcessor {
132132
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
133133
handle: Mutex<Option<thread::JoinHandle<()>>>,
134134
forceflush_timeout: Duration,
135-
shutdown_timeout: Duration,
136135
export_log_message_sent: Arc<AtomicBool>,
137136
current_batch_size: Arc<AtomicUsize>,
138137
max_export_batch_size: usize,
@@ -256,7 +255,7 @@ impl LogProcessor for BatchLogProcessor {
256255
}
257256
}
258257

259-
fn shutdown(&self) -> OTelSdkResult {
258+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
260259
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
261260
let max_queue_size = self.max_queue_size;
262261
if dropped_logs > 0 {
@@ -272,7 +271,7 @@ impl LogProcessor for BatchLogProcessor {
272271
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
273272
Ok(_) => {
274273
receiver
275-
.recv_timeout(self.shutdown_timeout)
274+
.recv_timeout(timeout)
276275
.map(|_| {
277276
// join the background thread after receiving back the
278277
// shutdown signal
@@ -287,7 +286,7 @@ impl LogProcessor for BatchLogProcessor {
287286
name: "BatchLogProcessor.Shutdown.Timeout",
288287
message = "BatchLogProcessor shutdown timing out."
289288
);
290-
OTelSdkError::Timeout(self.shutdown_timeout)
289+
OTelSdkError::Timeout(timeout)
291290
}
292291
_ => {
293292
otel_error!(
@@ -489,7 +488,6 @@ impl BatchLogProcessor {
489488
message_sender,
490489
handle: Mutex::new(Some(handle)),
491490
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
492-
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
493491
dropped_logs_count: AtomicUsize::new(0),
494492
max_queue_size,
495493
export_log_message_sent: Arc::new(AtomicBool::new(false)),

opentelemetry-sdk/src/logs/concurrent_log_processor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use opentelemetry::{otel_info, InstrumentationScope};
2+
use std::time::Duration;
23

34
use crate::{error::OTelSdkResult, Resource};
45

@@ -43,8 +44,8 @@ impl<T: LogExporter> LogProcessor for SimpleConcurrentLogProcessor<T> {
4344
Ok(())
4445
}
4546

46-
fn shutdown(&self) -> OTelSdkResult {
47-
self.exporter.shutdown()
47+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
48+
self.exporter.shutdown_with_timeout(timeout)
4849
}
4950

5051
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use opentelemetry::logs::Severity;
3434
use opentelemetry::InstrumentationScope;
3535

3636
use std::fmt::Debug;
37+
use std::time::Duration;
3738

3839
/// The interface for plugging into a [`SdkLogger`].
3940
///
@@ -56,7 +57,13 @@ pub trait LogProcessor: Send + Sync + Debug {
5657
/// Shuts down the processor.
5758
/// After shutdown returns the log processor should stop processing any logs.
5859
/// It's up to the implementation on when to drop the LogProcessor.
59-
fn shutdown(&self) -> OTelSdkResult;
60+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
61+
Ok(())
62+
}
63+
/// Shuts down the processor with default timeout.
64+
fn shutdown(&self) -> OTelSdkResult {
65+
self.shutdown_with_timeout(Duration::from_secs(5))
66+
}
6067
#[cfg(feature = "spec_unstable_logs_enabled")]
6168
/// Check if logging is enabled
6269
fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
@@ -133,10 +140,6 @@ pub(crate) mod tests {
133140
fn force_flush(&self) -> OTelSdkResult {
134141
Ok(())
135142
}
136-
137-
fn shutdown(&self) -> OTelSdkResult {
138-
Ok(())
139-
}
140143
}
141144

142145
#[derive(Debug)]
@@ -163,10 +166,6 @@ pub(crate) mod tests {
163166
fn force_flush(&self) -> OTelSdkResult {
164167
Ok(())
165168
}
166-
167-
fn shutdown(&self) -> OTelSdkResult {
168-
Ok(())
169-
}
170169
}
171170

172171
#[test]

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,10 +628,6 @@ mod tests {
628628
fn force_flush(&self) -> OTelSdkResult {
629629
Ok(())
630630
}
631-
632-
fn shutdown(&self) -> OTelSdkResult {
633-
Ok(())
634-
}
635631
}
636632

637633
#[derive(Debug)]
@@ -658,10 +654,6 @@ mod tests {
658654
fn force_flush(&self) -> OTelSdkResult {
659655
Ok(())
660656
}
661-
662-
fn shutdown(&self) -> OTelSdkResult {
663-
Ok(())
664-
}
665657
}
666658
#[test]
667659
fn test_log_data_modification_by_multiple_processors() {

opentelemetry-sdk/src/logs/logger_provider.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::error::{OTelSdkError, OTelSdkResult};
33
use crate::logs::LogExporter;
44
use crate::Resource;
55
use opentelemetry::{otel_debug, otel_info, InstrumentationScope};
6+
use std::time::Duration;
67
use std::{
78
borrow::Cow,
89
sync::{
@@ -96,7 +97,7 @@ impl SdkLoggerProvider {
9697
}
9798

9899
/// Shuts down this `LoggerProvider`
99-
pub fn shutdown(&self) -> OTelSdkResult {
100+
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
100101
otel_debug!(
101102
name: "LoggerProvider.ShutdownInvokedByUser",
102103
);
@@ -107,7 +108,7 @@ impl SdkLoggerProvider {
107108
.is_ok()
108109
{
109110
// propagate the shutdown signal to processors
110-
let result = self.inner.shutdown();
111+
let result = self.inner.shutdown_with_timeout(timeout);
111112
if result.iter().all(|res| res.is_ok()) {
112113
Ok(())
113114
} else {
@@ -123,6 +124,11 @@ impl SdkLoggerProvider {
123124
Err(OTelSdkError::AlreadyShutdown)
124125
}
125126
}
127+
128+
/// Shuts down this `LoggerProvider` with default timeout
129+
pub fn shutdown(&self) -> OTelSdkResult {
130+
self.shutdown_with_timeout(Duration::from_secs(5))
131+
}
126132
}
127133

128134
#[derive(Debug)]
@@ -133,10 +139,10 @@ struct LoggerProviderInner {
133139

134140
impl LoggerProviderInner {
135141
/// Shuts down the `LoggerProviderInner` and returns any errors.
136-
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
142+
pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec<OTelSdkResult> {
137143
let mut results = vec![];
138144
for processor in &self.processors {
139-
let result = processor.shutdown();
145+
let result = processor.shutdown_with_timeout(timeout);
140146
if let Err(err) = &result {
141147
// Log at debug level because:
142148
// - The error is also returned to the user for handling (if applicable)
@@ -149,6 +155,11 @@ impl LoggerProviderInner {
149155
}
150156
results
151157
}
158+
159+
/// Shuts down the `LoggerProviderInner` with default timeout and returns any errors.
160+
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
161+
self.shutdown_with_timeout(Duration::from_secs(5))
162+
}
152163
}
153164

154165
impl Drop for LoggerProviderInner {
@@ -330,7 +341,7 @@ mod tests {
330341
Ok(())
331342
}
332343

333-
fn shutdown(&self) -> OTelSdkResult {
344+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
334345
self.is_shutdown
335346
.lock()
336347
.map(|mut is_shutdown| *is_shutdown = true)
@@ -383,10 +394,6 @@ mod tests {
383394
Ok(())
384395
}
385396

386-
fn shutdown(&self) -> OTelSdkResult {
387-
Ok(())
388-
}
389-
390397
fn set_resource(&mut self, resource: &Resource) {
391398
let mut res = self.resource.lock().unwrap();
392399
*res = resource.clone();
@@ -903,7 +910,7 @@ mod tests {
903910
Ok(())
904911
}
905912

906-
fn shutdown(&self) -> OTelSdkResult {
913+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
907914
*self.shutdown_called.lock().unwrap() = true;
908915
Ok(())
909916
}
@@ -934,7 +941,7 @@ mod tests {
934941
Ok(())
935942
}
936943

937-
fn shutdown(&self) -> OTelSdkResult {
944+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
938945
let mut count = self.shutdown_count.lock().unwrap();
939946
*count += 1;
940947
Ok(())

0 commit comments

Comments
 (0)