Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
async fn main() {
// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();

Expand Down Expand Up @@ -140,6 +140,32 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
Ok(())
let shutdown_result = meter_provider.shutdown();

// Demonstrate handling the shutdown result.
match shutdown_result {
Ok(_) => println!("MeterProvider shutdown successfully"),
Err(e) => {
match e {
opentelemetry_sdk::error::ShutdownError::Failed(e) => {
// This indicates some failure during shutdown.
// Not much to do here, but log the error.
// So users at least know something went wrong,
// and possibly explain why some metrics were not exported.
println!("MeterProvider shutdown failed: {}", e)
}
opentelemetry_sdk::error::ShutdownError::AlreadyShutdown => {
// This indicates some user code tried to shutdown elsewhere.
// user need to review their code to ensure shutdown is called only once.
println!("MeterProvider already shutdown")
}
opentelemetry_sdk::error::ShutdownError::Timeout(e) => {
// This indicates the shutdown timed out, and a good
// hint to user to increase the timeout or even retry.
// (Shutdown method does not allow custom timeout today, but that is temporary)
println!("MeterProvider shutdown timed out after {:?}", e)
}
}
}
}
}
10 changes: 8 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

Expand Down Expand Up @@ -43,8 +44,13 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
let _ = self.client.lock()?.take();
fn shutdown(&self) -> ShutdownResult {
self.client
.lock()
.map_err(|e| {
ShutdownError::Failed(format!("Internal Error. Failed to acquire lock: {}", e))
})?
.take();

Check warning on line 53 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L47-L53

Added lines #L47 - L53 were not covered by tests

Ok(())
}
Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
Expand Down Expand Up @@ -89,8 +90,13 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
let _ = self.inner.lock()?.take();
fn shutdown(&self) -> ShutdownResult {
self.inner
.lock()
.map_err(|e| {
ShutdownError::Failed(format!("Internal Error. Failed to acquire lock: {}", e))
})?
.take();

Check warning on line 99 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L93-L99

Added lines #L93 - L99 were not covered by tests

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use async_trait::async_trait;
use core::fmt;
use opentelemetry_sdk::error::ShutdownResult;
use opentelemetry_sdk::metrics::MetricResult;

use opentelemetry_sdk::metrics::{
Expand Down Expand Up @@ -123,7 +124,7 @@
#[async_trait]
pub trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
fn shutdown(&self) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;
}

/// Export metrics in OTEL format.
Expand All @@ -149,7 +150,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {

Check warning on line 153 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L153

Added line #L153 was not covered by tests
self.client.shutdown()
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use opentelemetry::{
Key, KeyValue,
};
use opentelemetry_sdk::{
error::ShutdownResult,
metrics::{
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream,
Expand All @@ -31,7 +32,7 @@ impl MetricReader for SharedReader {
self.0.force_flush()
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.0.shutdown()
}

Expand Down
24 changes: 24 additions & 0 deletions opentelemetry-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,31 @@
//! Wrapper for error from trace, logs and metrics part of open telemetry.
use std::{result::Result, time::Duration};

use thiserror::Error;

/// Trait for errors returned by exporters
pub trait ExportError: std::error::Error + Send + Sync + 'static {
/// The name of exporter that returned this error
fn exporter_name(&self) -> &'static str;
}

#[derive(Error, Debug)]
/// Errors that can occur during shutdown.
pub enum ShutdownError {
/// Shutdown already invoked.
#[error("Shutdown already invoked")]
AlreadyShutdown,

/// Shutdown timed out before completing.
#[error("Shutdown timed out after {0:?}")]
Timeout(Duration),

/// Shutdown failed with an error.
/// This error is returned when the shutdown process failed with an error.
#[error("Shutdown failed: {0}")]
Failed(String),
}

/// A specialized `Result` type for Shutdown operations.
pub type ShutdownResult = Result<(), ShutdownError>;
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use crate::error::ShutdownResult;
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;
Expand All @@ -27,7 +28,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
///
/// After Shutdown is called, calls to Export will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;

/// Access the [Temporality] of the MetricExporter.
fn temporality(&self) -> Temporality;
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::error::ShutdownResult;
use crate::metrics::data::{self, Gauge, Sum};
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
use crate::metrics::exporter::PushMetricExporter;
Expand Down Expand Up @@ -277,7 +278,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
Ok(()) // In this implementation, flush does nothing
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
Ok(())
}

Expand Down
11 changes: 8 additions & 3 deletions opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

use opentelemetry::otel_debug;

use crate::metrics::{MetricError, MetricResult, Temporality};
use crate::{
error::{ShutdownError, ShutdownResult},
metrics::{MetricError, MetricResult, Temporality},
};

use super::{
data::ResourceMetrics,
Expand Down Expand Up @@ -107,8 +110,10 @@
}

/// Closes any connections and frees any resources used by the reader.
fn shutdown(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
fn shutdown(&self) -> ShutdownResult {
let mut inner = self.inner.lock().map_err(|e| {
ShutdownError::Failed(format!("Internal Error. Failed to acquire lock: {}", e))
})?;

Check warning on line 116 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L113-L116

Added lines #L113 - L116 were not covered by tests

// Any future call to collect will now return an error.
inner.sdk_producer = None;
Expand Down
13 changes: 7 additions & 6 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use opentelemetry::{
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::metrics::{MetricError, MetricResult};
use crate::Resource;
use crate::{
error::ShutdownResult,
metrics::{MetricError, MetricResult},
};

use super::{
meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View,
Expand Down Expand Up @@ -108,7 +111,7 @@ impl SdkMeterProvider {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> MetricResult<()> {
pub fn shutdown(&self) -> ShutdownResult {
otel_info!(
name: "MeterProvider.Shutdown",
message = "User initiated shutdown of MeterProvider."
Expand All @@ -131,15 +134,13 @@ impl SdkMeterProviderInner {
}
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
// If the previous value was true, shutdown was already invoked.
Err(MetricError::Other(
"MeterProvider shutdown already invoked.".into(),
))
Err(crate::error::ShutdownError::AlreadyShutdown)
} else {
self.pipes.shutdown()
}
Expand Down
27 changes: 14 additions & 13 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};

use crate::{
error::{ShutdownError, ShutdownResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -399,27 +400,27 @@
}
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| ShutdownError::Failed(e.to_string()))?;

// TODO: Make this timeout configurable.
match response_rx.recv_timeout(Duration::from_secs(5)) {
Ok(response) => {
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
Err(ShutdownError::Failed("Failed to shutdown".into()))

Check warning on line 416 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L416

Added line #L416 was not covered by tests
}
}
Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::Other(
"Failed to shutdown due to Timeout".into(),
)),
Err(mpsc::RecvTimeoutError::Timeout) => {
Err(ShutdownError::Timeout(Duration::from_secs(5)))

Check warning on line 420 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L420

Added line #L420 was not covered by tests
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(MetricError::Other("Failed to shutdown".into()))
Err(ShutdownError::Failed("Failed to shutdown".into()))

Check warning on line 423 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L423

Added line #L423 was not covered by tests
}
}
}
Expand Down Expand Up @@ -448,7 +449,7 @@
// completion, and avoid blocking the thread. The default shutdown on drop
// can still use blocking call. If user already explicitly called shutdown,
// drop won't call shutdown again.
fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.inner.shutdown()
}

Expand All @@ -468,10 +469,10 @@
mod tests {
use super::PeriodicReader;
use crate::{
metrics::InMemoryMetricExporter,
error::ShutdownResult,
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, MetricError,
MetricResult, SdkMeterProvider, Temporality,
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -521,7 +522,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
Ok(())
}

Expand All @@ -545,7 +546,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}
Expand Down
Loading
Loading