Skip to content

Commit e9ae9f9

Browse files
mohammadVatandoostlalitbcijothomas
authored
feat: add shutdown with timeout for metric reader and provider (#2890)
Co-authored-by: Lalit Kumar Bhasin <[email protected]> Co-authored-by: Cijo Thomas <[email protected]>
1 parent 4791aae commit e9ae9f9

File tree

8 files changed

+37
-22
lines changed

8 files changed

+37
-22
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ Released 2025-Mar-21
7878
Custom exporters will need to internally synchronize any mutable state, if applicable.
7979

8080
- **Breaking** The `shutdown_with_timeout` method is added to MetricExporter trait. This is breaking change for custom `MetricExporter` authors.
81+
- **Breaking** The `shutdown_with_timeout` method is added to MetricReader trait. This is breaking change for custom `MetricReader` authors.
8182
- Bug Fix: `BatchLogProcessor` now correctly calls `shutdown` on the exporter
8283
when its `shutdown` is invoked.
8384

opentelemetry-sdk/benches/metric.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
use rand::Rng;
2-
use std::sync::{Arc, Weak};
3-
41
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
52
use opentelemetry::{
63
metrics::{Counter, Histogram, MeterProvider as _},
@@ -15,6 +12,9 @@ use opentelemetry_sdk::{
1512
},
1613
Resource,
1714
};
15+
use rand::Rng;
16+
use std::sync::{Arc, Weak};
17+
use std::time::Duration;
1818

1919
#[derive(Clone, Debug)]
2020
struct SharedReader(Arc<dyn MetricReader>);
@@ -32,7 +32,7 @@ impl MetricReader for SharedReader {
3232
self.0.force_flush()
3333
}
3434

35-
fn shutdown(&self) -> OTelSdkResult {
35+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
3636
self.0.shutdown()
3737
}
3838

opentelemetry-sdk/src/metrics/manual_reader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
use opentelemetry::otel_debug;
2+
use std::time::Duration;
13
use std::{
24
fmt,
35
sync::{Mutex, Weak},
46
};
57

6-
use opentelemetry::otel_debug;
7-
88
use crate::{
99
error::{OTelSdkError, OTelSdkResult},
1010
metrics::{MetricError, MetricResult, Temporality},
@@ -110,7 +110,7 @@ impl MetricReader for ManualReader {
110110
}
111111

112112
/// Closes any connections and frees any resources used by the reader.
113-
fn shutdown(&self) -> OTelSdkResult {
113+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
114114
let mut inner = self
115115
.inner
116116
.lock()

opentelemetry-sdk/src/metrics/meter_provider.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
use core::fmt;
2+
use opentelemetry::{
3+
metrics::{Meter, MeterProvider},
4+
otel_debug, otel_error, otel_info, InstrumentationScope,
5+
};
6+
use std::time::Duration;
27
use std::{
38
collections::HashMap,
49
sync::{
@@ -7,11 +12,6 @@ use std::{
712
},
813
};
914

10-
use opentelemetry::{
11-
metrics::{Meter, MeterProvider},
12-
otel_debug, otel_error, otel_info, InstrumentationScope,
13-
};
14-
1515
use crate::error::OTelSdkResult;
1616
use crate::Resource;
1717

@@ -109,13 +109,18 @@ impl SdkMeterProvider {
109109
///
110110
/// There is no guaranteed that all telemetry be flushed or all resources have
111111
/// been released on error.
112-
pub fn shutdown(&self) -> OTelSdkResult {
112+
pub fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
113113
otel_debug!(
114114
name: "MeterProvider.Shutdown",
115115
message = "User initiated shutdown of MeterProvider."
116116
);
117117
self.inner.shutdown()
118118
}
119+
120+
/// shutdown with default timeout
121+
pub fn shutdown(&self) -> OTelSdkResult {
122+
self.shutdown_with_timeout(Duration::from_secs(5))
123+
}
119124
}
120125

121126
impl SdkMeterProviderInner {
@@ -130,7 +135,7 @@ impl SdkMeterProviderInner {
130135
}
131136
}
132137

133-
fn shutdown(&self) -> OTelSdkResult {
138+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
134139
if self
135140
.shutdown_invoked
136141
.swap(true, std::sync::atomic::Ordering::SeqCst)
@@ -141,6 +146,10 @@ impl SdkMeterProviderInner {
141146
self.pipes.shutdown()
142147
}
143148
}
149+
150+
fn shutdown(&self) -> OTelSdkResult {
151+
self.shutdown_with_timeout(Duration::from_secs(5))
152+
}
144153
}
145154

146155
impl Drop for SdkMeterProviderInner {

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
491491
// completion, and avoid blocking the thread. The default shutdown on drop
492492
// can still use blocking call. If user already explicitly called shutdown,
493493
// drop won't call shutdown again.
494-
fn shutdown(&self) -> OTelSdkResult {
494+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
495495
self.inner.shutdown()
496496
}
497497

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
390390
.and_then(|res| res)
391391
}
392392

393-
fn shutdown(&self) -> OTelSdkResult {
393+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
394394
let mut inner = self
395395
.inner
396396
.lock()

opentelemetry-sdk/src/metrics/reader.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Interfaces for reading and producing metrics
2-
use std::{fmt, sync::Weak};
3-
42
use crate::{error::OTelSdkResult, metrics::MetricResult};
3+
use std::time::Duration;
4+
use std::{fmt, sync::Weak};
55

66
use super::{data::ResourceMetrics, pipeline::Pipeline, InstrumentKind, Temporality};
77

@@ -46,7 +46,12 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
4646
///
4747
/// After `shutdown` is called, calls to `collect` will perform no operation and
4848
/// instead will return an error indicating the shutdown state.
49-
fn shutdown(&self) -> OTelSdkResult;
49+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
50+
51+
/// shutdown with default timeout
52+
fn shutdown(&self) -> OTelSdkResult {
53+
self.shutdown_with_timeout(Duration::from_secs(5))
54+
}
5055

5156
/// The output temporality, a function of instrument kind.
5257
/// This SHOULD be obtained from the exporter.

opentelemetry-sdk/src/testing/metrics/metric_reader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::sync::{Arc, Mutex, Weak};
2-
31
use crate::error::{OTelSdkError, OTelSdkResult};
42
use crate::metrics::{
53
data::ResourceMetrics, pipeline::Pipeline, reader::MetricReader, InstrumentKind,
64
};
75
use crate::metrics::{MetricResult, Temporality};
6+
use std::sync::{Arc, Mutex, Weak};
7+
use std::time::Duration;
88

99
#[derive(Debug, Clone)]
1010
pub struct TestMetricReader {
@@ -42,7 +42,7 @@ impl MetricReader for TestMetricReader {
4242
Ok(())
4343
}
4444

45-
fn shutdown(&self) -> OTelSdkResult {
45+
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
4646
let result = self.force_flush();
4747
{
4848
let mut is_shutdown = self.is_shutdown.lock().unwrap();

0 commit comments

Comments
 (0)