Skip to content

Commit 015252a

Browse files
add timeout duration to shutdown
1 parent b8b97ac commit 015252a

File tree

13 files changed

+56
-39
lines changed

13 files changed

+56
-39
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use super::OtlpHttpClient;
24
use http::{header::CONTENT_TYPE, Method};
35
use opentelemetry::otel_debug;
@@ -46,7 +48,7 @@ impl LogExporter for OtlpHttpClient {
4648
Ok(())
4749
}
4850

49-
fn shutdown(&mut self) -> OTelSdkResult {
51+
fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult {
5052
let mut client_guard = self.client.lock().map_err(|e| {
5153
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
5254
})?;

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use core::fmt;
2+
use std::time::Duration;
23
use opentelemetry::otel_debug;
34
use opentelemetry_proto::tonic::collector::logs::v1::{
45
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
@@ -86,7 +87,7 @@ impl LogExporter for TonicLogsClient {
8687
Ok(())
8788
}
8889

89-
fn shutdown(&mut self) -> OTelSdkResult {
90+
fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult {
9091
match self.inner.take() {
9192
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
9293
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.

opentelemetry-sdk/benches/log.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ RAM: 64.0 GB
1717

1818
use opentelemetry::time::now;
1919
use std::collections::HashMap;
20+
use std::time::Duration;
2021

2122
use criterion::{criterion_group, criterion_main, Criterion};
2223

@@ -38,7 +39,7 @@ impl LogProcessor for NoopProcessor {
3839
Ok(())
3940
}
4041

41-
fn shutdown(&self) -> OTelSdkResult {
42+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
4243
Ok(())
4344
}
4445
}

opentelemetry-sdk/benches/log_exporter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use opentelemetry::time::now;
1414
use opentelemetry_sdk::error::OTelSdkResult;
1515
use std::sync::Mutex;
16+
use std::time::Duration;
1617

1718
use criterion::{criterion_group, criterion_main, Criterion};
1819

@@ -73,7 +74,7 @@ impl LogProcessor for ExportingProcessorWithFuture {
7374
Ok(())
7475
}
7576

76-
fn shutdown(&self) -> OTelSdkResult {
77+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
7778
Ok(())
7879
}
7980
}
@@ -104,7 +105,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture {
104105
Ok(())
105106
}
106107

107-
fn shutdown(&self) -> OTelSdkResult {
108+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
108109
Ok(())
109110
}
110111
}

opentelemetry-sdk/benches/log_processor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use opentelemetry::time::now;
1515
use std::{
1616
sync::{Arc, Mutex},
17-
thread::sleep,
17+
thread::sleep, time::Duration,
1818
};
1919

2020
use criterion::{criterion_group, criterion_main, Criterion};
@@ -54,7 +54,7 @@ impl LogProcessor for NoopProcessor {
5454
Ok(())
5555
}
5656

57-
fn shutdown(&self) -> OTelSdkResult {
57+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
5858
Ok(())
5959
}
6060
}
@@ -71,7 +71,7 @@ impl LogProcessor for CloningProcessor {
7171
Ok(())
7272
}
7373

74-
fn shutdown(&self) -> OTelSdkResult {
74+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
7575
Ok(())
7676
}
7777
}
@@ -117,7 +117,7 @@ impl LogProcessor for SendToChannelProcessor {
117117
Ok(())
118118
}
119119

120-
fn shutdown(&self) -> OTelSdkResult {
120+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
121121
Ok(())
122122
}
123123
}

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ pub struct BatchLogProcessor {
132132
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
133133
handle: Mutex<Option<thread::JoinHandle<()>>>,
134134
forceflush_timeout: Duration,
135-
shutdown_timeout: Duration,
136135
export_log_message_sent: Arc<AtomicBool>,
137136
current_batch_size: Arc<AtomicUsize>,
138137
max_export_batch_size: usize,
@@ -256,7 +255,7 @@ impl LogProcessor for BatchLogProcessor {
256255
}
257256
}
258257

259-
fn shutdown(&self) -> OTelSdkResult {
258+
fn shutdown(&self, timeout: Duration) -> OTelSdkResult {
260259
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
261260
let max_queue_size = self.max_queue_size;
262261
if dropped_logs > 0 {
@@ -272,7 +271,7 @@ impl LogProcessor for BatchLogProcessor {
272271
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
273272
Ok(_) => {
274273
receiver
275-
.recv_timeout(self.shutdown_timeout)
274+
.recv_timeout(timeout)
276275
.map(|_| {
277276
// join the background thread after receiving back the
278277
// shutdown signal
@@ -287,7 +286,7 @@ impl LogProcessor for BatchLogProcessor {
287286
name: "BatchLogProcessor.Shutdown.Timeout",
288287
message = "BatchLogProcessor shutdown timing out."
289288
);
290-
OTelSdkError::Timeout(self.shutdown_timeout)
289+
OTelSdkError::Timeout(timeout)
291290
}
292291
_ => {
293292
otel_error!(
@@ -436,7 +435,7 @@ impl BatchLogProcessor {
436435
&current_batch_size,
437436
&config,
438437
);
439-
let _ = exporter.shutdown();
438+
let _ = exporter.shutdown(Duration::from_secs(5));
440439
let _ = sender.send(result);
441440

442441
otel_debug!(
@@ -488,7 +487,6 @@ impl BatchLogProcessor {
488487
message_sender,
489488
handle: Mutex::new(Some(handle)),
490489
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
491-
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
492490
dropped_logs_count: AtomicUsize::new(0),
493491
max_queue_size,
494492
export_log_message_sent: Arc::new(AtomicBool::new(false)),
@@ -922,7 +920,7 @@ mod tests {
922920

923921
processor.emit(&mut record, &instrumentation);
924922
processor.force_flush().unwrap();
925-
processor.shutdown().unwrap();
923+
processor.shutdown(Duration::from_secs(5)).unwrap();
926924
// todo: expect to see errors here. How should we assert this?
927925
processor.emit(&mut record, &instrumentation);
928926
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
@@ -934,27 +932,27 @@ mod tests {
934932
let exporter = InMemoryLogExporterBuilder::default().build();
935933
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
936934

937-
processor.shutdown().unwrap();
935+
processor.shutdown(Duration::from_secs(5)).unwrap();
938936
}
939937

940938
#[tokio::test(flavor = "current_thread")]
941939
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
942940
let exporter = InMemoryLogExporterBuilder::default().build();
943941
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
944-
processor.shutdown().unwrap();
942+
processor.shutdown(Duration::from_secs(5)).unwrap();
945943
}
946944

947945
#[tokio::test(flavor = "multi_thread")]
948946
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
949947
let exporter = InMemoryLogExporterBuilder::default().build();
950948
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
951-
processor.shutdown().unwrap();
949+
processor.shutdown(Duration::from_secs(5)).unwrap();
952950
}
953951

954952
#[tokio::test(flavor = "multi_thread")]
955953
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
956954
let exporter = InMemoryLogExporterBuilder::default().build();
957955
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
958-
processor.shutdown().unwrap();
956+
processor.shutdown(Duration::from_secs(5)).unwrap();
959957
}
960958
}

opentelemetry-sdk/src/logs/export.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::Resource;
66
use opentelemetry::logs::Severity;
77
use opentelemetry::InstrumentationScope;
88
use std::fmt::Debug;
9+
use std::time::Duration;
910

1011
/// A batch of log records to be exported by a `LogExporter`.
1112
///
@@ -136,7 +137,7 @@ pub trait LogExporter: Send + Sync + Debug {
136137
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
137138

138139
/// Shuts down the exporter.
139-
fn shutdown(&mut self) -> OTelSdkResult {
140+
fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult {
140141
Ok(())
141142
}
142143
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/in_memory_exporter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use opentelemetry::InstrumentationScope;
66
use std::borrow::Cow;
77
use std::sync::atomic::AtomicBool;
88
use std::sync::{Arc, Mutex};
9+
use std::time::Duration;
910

1011
type LogResult<T> = Result<T, OTelSdkError>;
1112

@@ -211,7 +212,7 @@ impl LogExporter for InMemoryLogExporter {
211212
Ok(())
212213
}
213214

214-
fn shutdown(&mut self) -> OTelSdkResult {
215+
fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult {
215216
self.shutdown_called
216217
.store(true, std::sync::atomic::Ordering::Relaxed);
217218
if self.should_reset_on_shutdown {

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use opentelemetry::logs::Severity;
3434
use opentelemetry::InstrumentationScope;
3535

3636
use std::fmt::Debug;
37+
use std::time::Duration;
3738

3839
/// The interface for plugging into a [`SdkLogger`].
3940
///
@@ -56,7 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug {
5657
/// Shuts down the processor.
5758
/// After shutdown returns the log processor should stop processing any logs.
5859
/// It's up to the implementation on when to drop the LogProcessor.
59-
fn shutdown(&self) -> OTelSdkResult;
60+
fn shutdown(&self, timeout: Duration) -> OTelSdkResult;
6061
#[cfg(feature = "spec_unstable_logs_enabled")]
6162
/// Check if logging is enabled
6263
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
@@ -81,6 +82,7 @@ pub(crate) mod tests {
8182
use opentelemetry::logs::{Logger, LoggerProvider};
8283
use opentelemetry::{InstrumentationScope, Key};
8384
use std::sync::{Arc, Mutex};
85+
use std::time::Duration;
8486

8587
#[derive(Debug, Clone)]
8688
pub(crate) struct MockLogExporter {
@@ -92,7 +94,7 @@ pub(crate) mod tests {
9294
Ok(())
9395
}
9496

95-
fn shutdown(&mut self) -> OTelSdkResult {
97+
fn shutdown(&mut self, _timeout: Duration) -> OTelSdkResult {
9698
Ok(())
9799
}
98100

@@ -138,7 +140,7 @@ pub(crate) mod tests {
138140
Ok(())
139141
}
140142

141-
fn shutdown(&self) -> OTelSdkResult {
143+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
142144
Ok(())
143145
}
144146
}
@@ -168,7 +170,7 @@ pub(crate) mod tests {
168170
Ok(())
169171
}
170172

171-
fn shutdown(&self) -> OTelSdkResult {
173+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
172174
Ok(())
173175
}
174176
}

opentelemetry-sdk/src/logs/logger_provider.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::error::{OTelSdkError, OTelSdkResult};
33
use crate::logs::LogExporter;
44
use crate::Resource;
55
use opentelemetry::{otel_debug, otel_info, InstrumentationScope};
6+
use std::time::Duration;
67
use std::{
78
borrow::Cow,
89
sync::{
@@ -100,8 +101,13 @@ impl SdkLoggerProvider {
100101
}
101102
}
102103

103-
/// Shuts down this `LoggerProvider`
104+
/// Shuts down this `LoggerProvider` with default timeout.
104105
pub fn shutdown(&self) -> OTelSdkResult {
106+
self.shutdown_with_timeout(Duration::from_secs(5)) // TODO: make this configurable
107+
}
108+
109+
/// Shuts down this `LoggerProvider` with a timeout.
110+
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
105111
otel_debug!(
106112
name: "LoggerProvider.ShutdownInvokedByUser",
107113
);
@@ -112,7 +118,7 @@ impl SdkLoggerProvider {
112118
.is_ok()
113119
{
114120
// propagate the shutdown signal to processors
115-
let result = self.inner.shutdown();
121+
let result = self.inner.shutdown(timeout);
116122
if result.iter().all(|res| res.is_ok()) {
117123
Ok(())
118124
} else {
@@ -139,10 +145,10 @@ struct LoggerProviderInner {
139145

140146
impl LoggerProviderInner {
141147
/// Shuts down the `LoggerProviderInner` and returns any errors.
142-
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
148+
pub(crate) fn shutdown(&self, timeout: Duration) -> Vec<OTelSdkResult> {
143149
let mut results = vec![];
144150
for processor in &self.processors {
145-
let result = processor.shutdown();
151+
let result = processor.shutdown(timeout);
146152
if let Err(err) = &result {
147153
// Log at debug level because:
148154
// - The error is also returned to the user for handling (if applicable)
@@ -164,7 +170,7 @@ impl Drop for LoggerProviderInner {
164170
name: "LoggerProvider.Drop",
165171
message = "Last reference of LoggerProvider dropped, initiating shutdown."
166172
);
167-
let _ = self.shutdown(); // errors are handled within shutdown
173+
let _ = self.shutdown(Duration::from_secs(5)); // errors are handled within shutdown
168174
} else {
169175
otel_debug!(
170176
name: "LoggerProvider.Drop.AlreadyShutdown",
@@ -331,7 +337,7 @@ mod tests {
331337
Ok(())
332338
}
333339

334-
fn shutdown(&self) -> OTelSdkResult {
340+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
335341
self.is_shutdown
336342
.lock()
337343
.map(|mut is_shutdown| *is_shutdown = true)
@@ -776,7 +782,7 @@ mod tests {
776782
Ok(())
777783
}
778784

779-
fn shutdown(&self) -> OTelSdkResult {
785+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
780786
*self.shutdown_called.lock().unwrap() = true;
781787
Ok(())
782788
}
@@ -807,7 +813,7 @@ mod tests {
807813
Ok(())
808814
}
809815

810-
fn shutdown(&self) -> OTelSdkResult {
816+
fn shutdown(&self, _timeout: Duration) -> OTelSdkResult {
811817
let mut count = self.shutdown_count.lock().unwrap();
812818
*count += 1;
813819
Ok(())

0 commit comments

Comments
 (0)