Skip to content

Commit c5f9718

Browse files
mohammadVatandoostlalitbcijothomas
authored
feat: add shutdown with timeout for traces (#2956)
Co-authored-by: Lalit Kumar Bhasin <[email protected]> Co-authored-by: Cijo Thomas <[email protected]>
1 parent 8f4fe23 commit c5f9718

File tree

9 files changed

+46
-25
lines changed

9 files changed

+46
-25
lines changed

examples/tracing-http-propagator/src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use opentelemetry_sdk::{
1919
};
2020
use opentelemetry_semantic_conventions::trace;
2121
use opentelemetry_stdout::{LogExporter, SpanExporter};
22+
use std::time::Duration;
2223
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock};
2324
use tokio::net::TcpListener;
2425
use tracing::info;
@@ -131,7 +132,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
131132
Ok(())
132133
}
133134

134-
fn shutdown(&self) -> OTelSdkResult {
135+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
135136
Ok(())
136137
}
137138

opentelemetry-sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ also modified to suppress telemetry before invoking exporters.
2525
- Fixed the overflow attribute to correctly use the boolean value `true`
2626
instead of the string `"true"`.
2727
[#2878](https://github.com/open-telemetry/opentelemetry-rust/issues/2878)
28+
- The `shutdown_with_timeout` method is added to SpanProcessor, SpanExporter trait and TracerProvider.
2829
- The `shutdown_with_timeout` method is added to LogExporter trait.
2930
- The `shutdown_with_timeout` method is added to LogProvider and LogProcessor trait.
3031
- *Breaking* `MetricError`, `MetricResult` no longer public (except when

opentelemetry-sdk/src/trace/export.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
55
use opentelemetry::{InstrumentationScope, KeyValue};
66
use std::borrow::Cow;
77
use std::fmt::Debug;
8-
use std::time::SystemTime;
8+
use std::time::{Duration, SystemTime};
99

1010
/// `SpanExporter` defines the interface that protocol-specific exporters must
1111
/// implement so that they can be plugged into OpenTelemetry SDK and support
@@ -43,9 +43,13 @@ pub trait SpanExporter: Send + Sync + Debug {
4343
/// flush the data and the destination is unavailable). SDK authors
4444
/// can decide if they want to make the shutdown timeout
4545
/// configurable.
46-
fn shutdown(&mut self) -> OTelSdkResult {
46+
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
4747
Ok(())
4848
}
49+
/// Shuts down the exporter with default timeout.
50+
fn shutdown(&mut self) -> OTelSdkResult {
51+
self.shutdown_with_timeout(Duration::from_nanos(5))
52+
}
4953

5054
/// This is a hint to ensure that the export of any Spans the exporter
5155
/// has received prior to the call to this function SHOULD be completed

opentelemetry-sdk/src/trace/in_memory_exporter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::resource::Resource;
33
use crate::trace::{SpanData, SpanExporter};
44
use crate::InMemoryExporterError;
55
use std::sync::{Arc, Mutex};
6+
use std::time::Duration;
67

78
/// An in-memory span exporter that stores span data in memory.
89
///
@@ -140,7 +141,7 @@ impl SpanExporter for InMemorySpanExporter {
140141
result
141142
}
142143

143-
fn shutdown(&mut self) -> OTelSdkResult {
144+
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
144145
self.reset();
145146
Ok(())
146147
}

opentelemetry-sdk/src/trace/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ mod runtime_tests;
5858

5959
#[cfg(all(test, feature = "testing"))]
6060
mod tests {
61-
6261
use super::*;
62+
use crate::error::OTelSdkResult;
6363
use crate::{
6464
trace::span_limit::{DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN},
6565
trace::{InMemorySpanExporter, InMemorySpanExporterBuilder},
@@ -76,6 +76,7 @@ mod tests {
7676
},
7777
Context, KeyValue,
7878
};
79+
use std::time::Duration;
7980

8081
#[test]
8182
fn span_modification_via_context() {
@@ -146,7 +147,7 @@ mod tests {
146147
Ok(())
147148
}
148149

149-
fn shutdown(&self) -> crate::error::OTelSdkResult {
150+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
150151
Ok(())
151152
}
152153
}

opentelemetry-sdk/src/trace/provider.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::IdGenerator;
12
use crate::error::{OTelSdkError, OTelSdkResult};
23
/// # Trace Provider SDK
34
///
@@ -74,8 +75,7 @@ use opentelemetry::{otel_info, InstrumentationScope};
7475
use std::borrow::Cow;
7576
use std::sync::atomic::{AtomicBool, Ordering};
7677
use std::sync::{Arc, OnceLock};
77-
78-
use super::IdGenerator;
78+
use std::time::Duration;
7979

8080
static PROVIDER_RESOURCE: OnceLock<Resource> = OnceLock::new();
8181

@@ -112,10 +112,10 @@ pub(crate) struct TracerProviderInner {
112112
impl TracerProviderInner {
113113
/// Crate-private shutdown method to be called both from explicit shutdown
114114
/// and from Drop when the last reference is released.
115-
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
115+
pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec<OTelSdkResult> {
116116
let mut results = vec![];
117117
for processor in &self.processors {
118-
let result = processor.shutdown();
118+
let result = processor.shutdown_with_timeout(timeout);
119119
if let Err(err) = &result {
120120
// Log at debug level because:
121121
// - The error is also returned to the user for handling (if applicable)
@@ -128,6 +128,10 @@ impl TracerProviderInner {
128128
}
129129
results
130130
}
131+
/// shutdown with default timeout
132+
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
133+
self.shutdown_with_timeout(Duration::from_secs(5))
134+
}
131135
}
132136

133137
impl Drop for TracerProviderInner {
@@ -239,15 +243,15 @@ impl SdkTracerProvider {
239243
/// Shuts down the current `TracerProvider`.
240244
///
241245
/// Note that shut down doesn't means the TracerProvider has dropped
242-
pub fn shutdown(&self) -> OTelSdkResult {
246+
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
243247
if self
244248
.inner
245249
.is_shutdown
246250
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
247251
.is_ok()
248252
{
249253
// propagate the shutdown signal to processors
250-
let results = self.inner.shutdown();
254+
let results = self.inner.shutdown_with_timeout(timeout);
251255

252256
if results.iter().all(|res| res.is_ok()) {
253257
Ok(())
@@ -264,6 +268,11 @@ impl SdkTracerProvider {
264268
Err(OTelSdkError::AlreadyShutdown)
265269
}
266270
}
271+
272+
/// shutdown with default timeout
273+
pub fn shutdown(&self) -> OTelSdkResult {
274+
self.shutdown_with_timeout(Duration::from_secs(5))
275+
}
267276
}
268277

269278
impl opentelemetry::trace::TracerProvider for SdkTracerProvider {
@@ -471,6 +480,7 @@ mod tests {
471480
use std::env;
472481
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
473482
use std::sync::Arc;
483+
use std::time::Duration;
474484

475485
// fields below is wrapped with Arc so we can assert it
476486
#[derive(Default, Debug)]
@@ -528,7 +538,7 @@ mod tests {
528538
}
529539
}
530540

531-
fn shutdown(&self) -> OTelSdkResult {
541+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
532542
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
533543
Ok(())
534544
} else {
@@ -787,7 +797,7 @@ mod tests {
787797
Ok(())
788798
}
789799

790-
fn shutdown(&self) -> OTelSdkResult {
800+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
791801
self.shutdown_count.fetch_add(1, Ordering::SeqCst);
792802
Ok(())
793803
}

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
9090
/// opportunity for processors to do any cleanup required.
9191
///
9292
/// Implementation should make sure shutdown can be called multiple times.
93-
fn shutdown(&self) -> OTelSdkResult;
93+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
94+
/// shutdown the processor with a default timeout.
95+
fn shutdown(&self) -> OTelSdkResult {
96+
self.shutdown_with_timeout(Duration::from_secs(5))
97+
}
9498
/// Set the resource for the span processor.
9599
fn set_resource(&mut self, _resource: &Resource) {}
96100
}
@@ -154,9 +158,9 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
154158
Ok(())
155159
}
156160

157-
fn shutdown(&self) -> OTelSdkResult {
161+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
158162
if let Ok(mut exporter) = self.exporter.lock() {
159-
exporter.shutdown()
163+
exporter.shutdown_with_timeout(timeout)
160164
} else {
161165
Err(OTelSdkError::InternalFailure(
162166
"SimpleSpanProcessor mutex poison at shutdown".into(),
@@ -285,7 +289,6 @@ pub struct BatchSpanProcessor {
285289
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
286290
handle: Mutex<Option<thread::JoinHandle<()>>>,
287291
forceflush_timeout: Duration,
288-
shutdown_timeout: Duration,
289292
is_shutdown: AtomicBool,
290293
dropped_span_count: Arc<AtomicUsize>,
291294
export_span_message_sent: Arc<AtomicBool>,
@@ -424,7 +427,6 @@ impl BatchSpanProcessor {
424427
message_sender,
425428
handle: Mutex::new(Some(handle)),
426429
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
427-
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
428430
is_shutdown: AtomicBool::new(false),
429431
dropped_span_count: Arc::new(AtomicUsize::new(0)),
430432
max_queue_size,
@@ -580,7 +582,7 @@ impl SpanProcessor for BatchSpanProcessor {
580582
}
581583

582584
/// Shuts down the processor.
583-
fn shutdown(&self) -> OTelSdkResult {
585+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
584586
if self.is_shutdown.swap(true, Ordering::Relaxed) {
585587
return Err(OTelSdkError::AlreadyShutdown);
586588
}
@@ -601,8 +603,8 @@ impl SpanProcessor for BatchSpanProcessor {
601603
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
602604

603605
let result = receiver
604-
.recv_timeout(self.shutdown_timeout)
605-
.map_err(|_| OTelSdkError::Timeout(self.shutdown_timeout))?;
606+
.recv_timeout(timeout)
607+
.map_err(|_| OTelSdkError::Timeout(timeout))?;
606608
if let Some(handle) = self.handle.lock().unwrap().take() {
607609
if let Err(err) = handle.join() {
608610
return Err(OTelSdkError::InternalFailure(format!(

opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn};
1818
use std::fmt;
1919
use std::sync::atomic::{AtomicUsize, Ordering};
2020
use std::sync::Arc;
21+
use std::time::Duration;
2122

2223
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2324
/// them at a preconfigured interval.
@@ -133,7 +134,7 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
133134
})?
134135
}
135136

136-
fn shutdown(&self) -> OTelSdkResult {
137+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
137138
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
138139
let max_queue_size = self.max_queue_size;
139140
if dropped_spans > 0 {

stress/src/traces.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
99
~10.6 M /sec
1010
*/
11-
1211
use lazy_static::lazy_static;
1312
use opentelemetry::{
1413
trace::{Span, SpanBuilder, Tracer, TracerProvider},
@@ -18,6 +17,7 @@ use opentelemetry_sdk::{
1817
error::OTelSdkResult,
1918
trace::{self as sdktrace, SpanData, SpanProcessor},
2019
};
20+
use std::time::Duration;
2121

2222
mod throughput;
2323

@@ -45,7 +45,7 @@ impl SpanProcessor for NoOpSpanProcessor {
4545
Ok(())
4646
}
4747

48-
fn shutdown(&self) -> OTelSdkResult {
48+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
4949
Ok(())
5050
}
5151
}

0 commit comments

Comments
 (0)