Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::error::ShutdownError;
use opentelemetry_sdk::error::OTelSdkError;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::Resource;
use std::error::Error;
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

Expand Down Expand Up @@ -44,10 +44,10 @@
Ok(())
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 47 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L47

Added line #L47 was not covered by tests
self.client
.lock()
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?

Check warning on line 50 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L50

Added line #L50 was not covered by tests
.take();

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
Expand Down Expand Up @@ -90,10 +90,10 @@
Ok(())
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 93 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L93

Added line #L93 was not covered by tests
self.inner
.lock()
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?

Check warning on line 96 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L96

Added line #L96 was not covered by tests
.take();

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use async_trait::async_trait;
use core::fmt;
use opentelemetry_sdk::error::ShutdownResult;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::metrics::MetricResult;

use opentelemetry_sdk::metrics::{
Expand Down Expand Up @@ -124,7 +124,7 @@
#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;
fn shutdown(&self) -> OTelSdkResult;
}

/// Export metrics in OTEL format.
Expand All @@ -150,7 +150,7 @@
Ok(())
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 153 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L153

Added line #L153 was not covered by tests
self.client.shutdown()
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use opentelemetry::{
Key, KeyValue,
};
use opentelemetry_sdk::{
error::ShutdownResult,
error::OTelSdkResult,
metrics::{
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream,
Expand All @@ -32,7 +32,7 @@ impl MetricReader for SharedReader {
self.0.force_flush()
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
self.0.shutdown()
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pub trait ExportError: std::error::Error + Send + Sync + 'static {
}

#[derive(Error, Debug)]
/// Errors that can occur during shutdown.
pub enum ShutdownError {
/// Errors that can occur during SDK operations export(), force_flush() and shutdown().
pub enum OTelSdkError {
/// Shutdown has already been invoked.
///
/// While shutdown is idempotent and calling it multiple times has no
Expand Down Expand Up @@ -42,4 +42,4 @@ pub enum ShutdownError {
}

/// A specialized `Result` type for Shutdown operations.
pub type ShutdownResult = Result<(), ShutdownError>;
pub type OTelSdkResult = Result<(), OTelSdkError>;
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use crate::error::ShutdownResult;
use crate::error::OTelSdkResult;
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;
Expand All @@ -28,7 +28,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
///
/// After Shutdown is called, calls to Export will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> ShutdownResult;
fn shutdown(&self) -> OTelSdkResult;

/// Access the [Temporality] of the MetricExporter.
fn temporality(&self) -> Temporality;
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::ShutdownResult;
use crate::error::OTelSdkResult;
use crate::metrics::data::{self, Gauge, Sum};
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
use crate::metrics::exporter::PushMetricExporter;
Expand Down Expand Up @@ -278,7 +278,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
Ok(()) // In this implementation, flush does nothing
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use opentelemetry::otel_debug;

use crate::{
error::{ShutdownError, ShutdownResult},
error::{OTelSdkError, OTelSdkResult},
metrics::{MetricError, MetricResult, Temporality},
};

Expand Down Expand Up @@ -110,10 +110,11 @@
}

/// Closes any connections and frees any resources used by the reader.
fn shutdown(&self) -> ShutdownResult {
let mut inner = self.inner.lock().map_err(|e| {
ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e))
})?;
fn shutdown(&self) -> OTelSdkResult {
let mut inner = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?;

Check warning on line 117 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L113-L117

Added lines #L113 - L117 were not covered by tests

// Any future call to collect will now return an error.
inner.sdk_producer = None;
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use opentelemetry::{

use crate::Resource;
use crate::{
error::ShutdownResult,
error::OTelSdkResult,
metrics::{MetricError, MetricResult},
};

Expand Down Expand Up @@ -112,7 +112,7 @@ impl SdkMeterProvider {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> ShutdownResult {
pub fn shutdown(&self) -> OTelSdkResult {
otel_info!(
name: "MeterProvider.Shutdown",
message = "User initiated shutdown of MeterProvider."
Expand All @@ -135,13 +135,13 @@ impl SdkMeterProviderInner {
}
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
// If the previous value was true, shutdown was already invoked.
Err(crate::error::ShutdownError::AlreadyShutdown)
Err(crate::error::OTelSdkError::AlreadyShutdown)
} else {
self.pipes.shutdown()
}
Expand Down
24 changes: 12 additions & 12 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};

use crate::{
error::{ShutdownError, ShutdownResult},
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -474,27 +474,27 @@
}
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| ShutdownError::InternalFailure(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

// TODO: Make this timeout configurable.
match response_rx.recv_timeout(Duration::from_secs(5)) {
Ok(response) => {
if response {
Ok(())
} else {
Err(ShutdownError::InternalFailure("Failed to shutdown".into()))
Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))

Check warning on line 490 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L490

Added line #L490 was not covered by tests
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
Err(ShutdownError::Timeout(Duration::from_secs(5)))
Err(OTelSdkError::Timeout(Duration::from_secs(5)))

Check warning on line 494 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L494

Added line #L494 was not covered by tests
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(ShutdownError::InternalFailure("Failed to shutdown".into()))
Err(OTelSdkError::InternalFailure("Failed to shutdown".into()))

Check warning on line 497 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L497

Added line #L497 was not covered by tests
}
}
}
Expand Down Expand Up @@ -523,7 +523,7 @@
// completion, and avoid blocking the thread. The default shutdown on drop
// can still use blocking call. If user already explicitly called shutdown,
// drop won't call shutdown again.
fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
self.inner.shutdown()
}

Expand All @@ -543,7 +543,7 @@
mod tests {
use super::PeriodicReader;
use crate::{
error::{ShutdownError, ShutdownResult},
error::{OTelSdkError, OTelSdkResult},
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
Expand Down Expand Up @@ -596,7 +596,7 @@
Ok(())
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand All @@ -620,7 +620,7 @@
Ok(())
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}
Expand Down Expand Up @@ -675,12 +675,12 @@
// calling shutdown again should return Err
let result = meter_provider.shutdown();
assert!(result.is_err());
assert!(matches!(result, Err(ShutdownError::AlreadyShutdown)));
assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));

// calling shutdown again should return Err
let result = meter_provider.shutdown();
assert!(result.is_err());
assert!(matches!(result, Err(ShutdownError::AlreadyShutdown)));
assert!(matches!(result, Err(OTelSdkError::AlreadyShutdown)));
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use crate::runtime::Runtime;
use crate::{
error::{ShutdownError, ShutdownResult},
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -217,7 +217,7 @@
enum Message {
Export,
Flush(oneshot::Sender<MetricResult<()>>),
Shutdown(oneshot::Sender<ShutdownResult>),
Shutdown(oneshot::Sender<OTelSdkResult>),
}

enum ProducerOrWorker {
Expand Down Expand Up @@ -297,7 +297,7 @@
let res = self.collect_and_export().await;
let _ = self.reader.exporter.shutdown();
if let Err(send_error) =
ch.send(res.map_err(|e| ShutdownError::InternalFailure(e.to_string())))
ch.send(res.map_err(|e| OTelSdkError::InternalFailure(e.to_string())))
{
otel_debug!(
name: "PeriodicReader.Shutdown.SendResultError",
Expand Down Expand Up @@ -378,30 +378,30 @@
.and_then(|res| res)
}

fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
let mut inner = self
.inner
.lock()
.map_err(|e| ShutdownError::InternalFailure(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
if inner.is_shutdown {
return Err(ShutdownError::AlreadyShutdown);
return Err(OTelSdkError::AlreadyShutdown);

Check warning on line 387 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L387

Added line #L387 was not covered by tests
}

let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Shutdown(sender))
.map_err(|e| ShutdownError::InternalFailure(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
drop(inner); // don't hold lock when blocking on future

let shutdown_result = futures_executor::block_on(receiver)
.map_err(|err| ShutdownError::InternalFailure(err.to_string()))?;
.map_err(|err| OTelSdkError::InternalFailure(err.to_string()))?;

// Acquire the lock again to set the shutdown flag
let mut inner = self
.inner
.lock()
.map_err(|e| ShutdownError::InternalFailure(e.to_string()))?;
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
inner.is_shutdown = true;

shutdown_result
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use opentelemetry::{otel_debug, InstrumentationScope, KeyValue};

use crate::{
error::ShutdownResult,
error::OTelSdkResult,
metrics::{
aggregation,
data::{Metric, ResourceMetrics, ScopeMetrics},
Expand Down Expand Up @@ -95,7 +95,7 @@
}

/// Shut down pipeline
fn shutdown(&self) -> ShutdownResult {
fn shutdown(&self) -> OTelSdkResult {
self.reader.shutdown()
}
}
Expand Down Expand Up @@ -650,7 +650,7 @@
}

/// Shut down all pipelines
pub(crate) fn shutdown(&self) -> ShutdownResult {
pub(crate) fn shutdown(&self) -> OTelSdkResult {
let mut errs = vec![];
for pipeline in &self.0 {
if let Err(err) = pipeline.shutdown() {
Expand All @@ -661,7 +661,7 @@
if errs.is_empty() {
Ok(())
} else {
Err(crate::error::ShutdownError::InternalFailure(format!(
Err(crate::error::OTelSdkError::InternalFailure(format!(

Check warning on line 664 in opentelemetry-sdk/src/metrics/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/pipeline.rs#L664

Added line #L664 was not covered by tests
"{errs:?}"
)))
}
Expand Down
Loading