Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Released 2025-Mar-21
Custom exporters will need to internally synchronize any mutable state, if applicable.

- **Breaking** The `shutdown_with_timeout` method is added to MetricExporter trait. This is breaking change for custom `MetricExporter` authors.
- **Breaking** The `shutdown_with_timeout` method is added to MetricReader trait. This is breaking change for custom `MetricReader` authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in vnext section

- Bug Fix: `BatchLogProcessor` now correctly calls `shutdown` on the exporter
when its `shutdown` is invoked.

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use rand::Rng;
use std::sync::{Arc, Weak};

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use opentelemetry::{
metrics::{Counter, Histogram, MeterProvider as _},
Expand All @@ -15,6 +12,9 @@ use opentelemetry_sdk::{
},
Resource,
};
use rand::Rng;
use std::sync::{Arc, Weak};
use std::time::Duration;

#[derive(Clone, Debug)]
struct SharedReader(Arc<dyn MetricReader>);
Expand All @@ -32,7 +32,7 @@ impl MetricReader for SharedReader {
self.0.force_flush()
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.0.shutdown()
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use opentelemetry::otel_debug;
use std::time::Duration;
use std::{
fmt,
sync::{Mutex, Weak},
};

use opentelemetry::otel_debug;

use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{MetricError, MetricResult, Temporality},
Expand Down Expand Up @@ -110,7 +110,7 @@
}

/// Closes any connections and frees any resources used by the reader.
fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {

Check warning on line 113 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L113

Added line #L113 was not covered by tests
let mut inner = self
.inner
.lock()
Expand Down
23 changes: 16 additions & 7 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use core::fmt;
use opentelemetry::{
metrics::{Meter, MeterProvider},
otel_debug, otel_error, otel_info, InstrumentationScope,
};
use std::time::Duration;
use std::{
collections::HashMap,
sync::{
Expand All @@ -7,11 +12,6 @@ use std::{
},
};

use opentelemetry::{
metrics::{Meter, MeterProvider},
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::error::OTelSdkResult;
use crate::Resource;

Expand Down Expand Up @@ -109,13 +109,18 @@ impl SdkMeterProvider {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> OTelSdkResult {
pub fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
otel_debug!(
name: "MeterProvider.Shutdown",
message = "User initiated shutdown of MeterProvider."
);
self.inner.shutdown()
}

/// shutdown with default timeout
pub fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
}

impl SdkMeterProviderInner {
Expand All @@ -130,7 +135,7 @@ impl SdkMeterProviderInner {
}
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::SeqCst)
Expand All @@ -141,6 +146,10 @@ impl SdkMeterProviderInner {
self.pipes.shutdown()
}
}

fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
}

impl Drop for SdkMeterProviderInner {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
// completion, and avoid blocking the thread. The default shutdown on drop
// can still use blocking call. If user already explicitly called shutdown,
// drop won't call shutdown again.
fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.inner.shutdown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl<E: PushMetricExporter> MetricReader for PeriodicReader<E> {
.and_then(|res| res)
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let mut inner = self
.inner
.lock()
Expand Down
11 changes: 8 additions & 3 deletions opentelemetry-sdk/src/metrics/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Interfaces for reading and producing metrics
use std::{fmt, sync::Weak};

use crate::{error::OTelSdkResult, metrics::MetricResult};
use std::time::Duration;
use std::{fmt, sync::Weak};

use super::{data::ResourceMetrics, pipeline::Pipeline, InstrumentKind, Temporality};

Expand Down Expand Up @@ -46,7 +46,12 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static {
///
/// After `shutdown` is called, calls to `collect` will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> OTelSdkResult;
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;

/// shutdown with default timeout
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}

/// The output temporality, a function of instrument kind.
/// This SHOULD be obtained from the exporter.
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/testing/metrics/metric_reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::{Arc, Mutex, Weak};

use crate::error::{OTelSdkError, OTelSdkResult};
use crate::metrics::{
data::ResourceMetrics, pipeline::Pipeline, reader::MetricReader, InstrumentKind,
};
use crate::metrics::{MetricResult, Temporality};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct TestMetricReader {
Expand Down Expand Up @@ -42,7 +42,7 @@ impl MetricReader for TestMetricReader {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let result = self.force_flush();
{
let mut is_shutdown = self.is_shutdown.lock().unwrap();
Expand Down