Skip to content

Commit 1d37e07

Browse files
mohammadVatandoostlalitbcijothomas
authored
feat: add-shutdown-with-timeout-for-log-provider-and-processor (#2941)
Co-authored-by: Lalit Kumar Bhasin <[email protected]> Co-authored-by: Cijo Thomas <[email protected]>
1 parent 4f2de12 commit 1d37e07

File tree

11 files changed

+33
-55
lines changed

11 files changed

+33
-55
lines changed

examples/tracing-http-propagator/src/server.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,6 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
120120
fn force_flush(&self) -> OTelSdkResult {
121121
Ok(())
122122
}
123-
124-
fn shutdown(&self) -> OTelSdkResult {
125-
Ok(())
126-
}
127123
}
128124

129125
/// A custom span processor that enriches spans with baggage attributes. Baggage

opentelemetry-appender-tracing/benches/log-attributes.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ impl LogProcessor for NoopProcessor {
4343
fn force_flush(&self) -> OTelSdkResult {
4444
Ok(())
4545
}
46-
47-
fn shutdown(&self) -> OTelSdkResult {
48-
Ok(())
49-
}
5046
}
5147

5248
/// Creates a single benchmark for a specific number of attributes

opentelemetry-appender-tracing/benches/logs.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,6 @@ impl LogProcessor for NoopProcessor {
5454
Ok(())
5555
}
5656

57-
fn shutdown(&self) -> OTelSdkResult {
58-
Ok(())
59-
}
60-
6157
fn event_enabled(
6258
&self,
6359
_level: opentelemetry::logs::Severity,

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -877,10 +877,6 @@ mod tests {
877877
fn force_flush(&self) -> OTelSdkResult {
878878
Ok(())
879879
}
880-
881-
fn shutdown(&self) -> OTelSdkResult {
882-
Ok(())
883-
}
884880
}
885881

886882
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-proto/src/transform/logs.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,6 @@ mod tests {
239239
fn force_flush(&self) -> OTelSdkResult {
240240
Ok(())
241241
}
242-
243-
fn shutdown(&self) -> OTelSdkResult {
244-
Ok(())
245-
}
246242
}
247243

248244
fn create_test_log_data(

opentelemetry-sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ also modified to suppress telemetry before invoking exporters.
2626
instead of the string `"true"`.
2727
[#2878](https://github.com/open-telemetry/opentelemetry-rust/issues/2878)
2828
- The `shutdown_with_timeout` method is added to LogExporter trait.
29+
- The `shutdown_with_timeout` method is added to LogProvider and LogProcessor trait.
2930
- *Breaking* `MetricError`, `MetricResult` no longer public (except when
3031
`spec_unstable_metrics_views` feature flag is enabled). `OTelSdkResult` should
3132
be used instead, wherever applicable. [#2906](https://github.com/open-telemetry/opentelemetry-rust/pull/2906)

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ pub struct BatchLogProcessor {
132132
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
133133
handle: Mutex<Option<thread::JoinHandle<()>>>,
134134
forceflush_timeout: Duration,
135-
shutdown_timeout: Duration,
136135
export_log_message_sent: Arc<AtomicBool>,
137136
current_batch_size: Arc<AtomicUsize>,
138137
max_export_batch_size: usize,
@@ -256,7 +255,7 @@ impl LogProcessor for BatchLogProcessor {
256255
}
257256
}
258257

259-
fn shutdown(&self) -> OTelSdkResult {
258+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
260259
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
261260
let max_queue_size = self.max_queue_size;
262261
if dropped_logs > 0 {
@@ -272,7 +271,7 @@ impl LogProcessor for BatchLogProcessor {
272271
match self.message_sender.try_send(BatchMessage::Shutdown(sender)) {
273272
Ok(_) => {
274273
receiver
275-
.recv_timeout(self.shutdown_timeout)
274+
.recv_timeout(timeout)
276275
.map(|_| {
277276
// join the background thread after receiving back the
278277
// shutdown signal
@@ -287,7 +286,7 @@ impl LogProcessor for BatchLogProcessor {
287286
name: "BatchLogProcessor.Shutdown.Timeout",
288287
message = "BatchLogProcessor shutdown timing out."
289288
);
290-
OTelSdkError::Timeout(self.shutdown_timeout)
289+
OTelSdkError::Timeout(timeout)
291290
}
292291
_ => {
293292
otel_error!(
@@ -489,7 +488,6 @@ impl BatchLogProcessor {
489488
message_sender,
490489
handle: Mutex::new(Some(handle)),
491490
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
492-
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
493491
dropped_logs_count: AtomicUsize::new(0),
494492
max_queue_size,
495493
export_log_message_sent: Arc::new(AtomicBool::new(false)),

opentelemetry-sdk/src/logs/concurrent_log_processor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use opentelemetry::{otel_info, InstrumentationScope};
2+
use std::time::Duration;
23

34
use crate::{error::OTelSdkResult, Resource};
45

@@ -43,8 +44,8 @@ impl<T: LogExporter> LogProcessor for SimpleConcurrentLogProcessor<T> {
4344
Ok(())
4445
}
4546

46-
fn shutdown(&self) -> OTelSdkResult {
47-
self.exporter.shutdown()
47+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
48+
self.exporter.shutdown_with_timeout(timeout)
4849
}
4950

5051
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use opentelemetry::logs::Severity;
3434
use opentelemetry::InstrumentationScope;
3535

3636
use std::fmt::Debug;
37+
use std::time::Duration;
3738

3839
/// The interface for plugging into a [`SdkLogger`].
3940
///
@@ -56,7 +57,13 @@ pub trait LogProcessor: Send + Sync + Debug {
5657
/// Shuts down the processor.
5758
/// After shutdown returns the log processor should stop processing any logs.
5859
/// It's up to the implementation on when to drop the LogProcessor.
59-
fn shutdown(&self) -> OTelSdkResult;
60+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
61+
Ok(())
62+
}
63+
/// Shuts down the processor with default timeout.
64+
fn shutdown(&self) -> OTelSdkResult {
65+
self.shutdown_with_timeout(Duration::from_secs(5))
66+
}
6067
#[cfg(feature = "spec_unstable_logs_enabled")]
6168
/// Check if logging is enabled
6269
fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
@@ -133,10 +140,6 @@ pub(crate) mod tests {
133140
fn force_flush(&self) -> OTelSdkResult {
134141
Ok(())
135142
}
136-
137-
fn shutdown(&self) -> OTelSdkResult {
138-
Ok(())
139-
}
140143
}
141144

142145
#[derive(Debug)]
@@ -163,10 +166,6 @@ pub(crate) mod tests {
163166
fn force_flush(&self) -> OTelSdkResult {
164167
Ok(())
165168
}
166-
167-
fn shutdown(&self) -> OTelSdkResult {
168-
Ok(())
169-
}
170169
}
171170

172171
#[test]

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,10 +628,6 @@ mod tests {
628628
fn force_flush(&self) -> OTelSdkResult {
629629
Ok(())
630630
}
631-
632-
fn shutdown(&self) -> OTelSdkResult {
633-
Ok(())
634-
}
635631
}
636632

637633
#[derive(Debug)]
@@ -658,10 +654,6 @@ mod tests {
658654
fn force_flush(&self) -> OTelSdkResult {
659655
Ok(())
660656
}
661-
662-
fn shutdown(&self) -> OTelSdkResult {
663-
Ok(())
664-
}
665657
}
666658
#[test]
667659
fn test_log_data_modification_by_multiple_processors() {

0 commit comments

Comments
 (0)