diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index 7ce0c02802..caca5a0af7 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -145,7 +145,7 @@ impl PushMetricExporter for MetricExporter { self.client.export(metrics).await } - async fn force_flush(&self) -> MetricResult<()> { + async fn force_flush(&self) -> OTelSdkResult { // this component is stateless Ok(()) } diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 77de26dc3c..f4d0f5219a 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -28,7 +28,7 @@ impl MetricReader for SharedReader { self.0.collect(rm) } - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { self.0.force_flush() } diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index 9e7425b47d..d657c7238b 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use crate::error::OTelSdkResult; -use crate::metrics::MetricResult; use crate::metrics::data::ResourceMetrics; @@ -21,7 +20,7 @@ pub trait PushMetricExporter: Send + Sync + 'static { async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult; /// Flushes any metric data held by an exporter. - async fn force_flush(&self) -> MetricResult<()>; + async fn force_flush(&self) -> OTelSdkResult; /// Releases any held computational resources. /// diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index c97add60eb..eeaf640c45 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -274,7 +274,7 @@ impl PushMetricExporter for InMemoryMetricExporter { .map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string())) } - async fn force_flush(&self) -> MetricResult<()> { + async fn force_flush(&self) -> OTelSdkResult { Ok(()) // In this implementation, flush does nothing } diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index 11ff4e3684..9a9f8915ae 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -105,7 +105,7 @@ impl MetricReader for ManualReader { } /// ForceFlush is a no-op, it always returns nil. - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index e03e5fc3ef..a034f377be 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -12,11 +12,8 @@ use opentelemetry::{ otel_debug, otel_error, otel_info, InstrumentationScope, }; +use crate::error::OTelSdkResult; use crate::Resource; -use crate::{ - error::OTelSdkResult, - metrics::{MetricError, MetricResult}, -}; use super::{ exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, @@ -96,7 +93,7 @@ impl SdkMeterProvider { /// Ok(()) /// } /// ``` - pub fn force_flush(&self) -> MetricResult<()> { + pub fn force_flush(&self) -> OTelSdkResult { self.inner.force_flush() } @@ -122,14 +119,12 @@ impl SdkMeterProvider { } impl SdkMeterProviderInner { - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { if self .shutdown_invoked .load(std::sync::atomic::Ordering::Relaxed) { - Err(MetricError::Other( - "Cannot perform flush as MeterProvider shutdown already invoked.".into(), - )) + Err(crate::error::OTelSdkError::AlreadyShutdown) } else { self.pipes.force_flush() } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 9aeb3cab34..ad0438ecdb 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -443,7 +443,7 @@ impl PeriodicReaderInner { futures_executor::block_on(self.exporter.export(&mut rm)) } - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { // TODO: Better message for this scenario. // Flush and Shutdown called from 2 threads Flush check shutdown // flag before shutdown thread sets it. Both threads attempt to send @@ -460,17 +460,17 @@ impl PeriodicReaderInner { let (response_tx, response_rx) = mpsc::channel(); self.message_sender .send(Message::Flush(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; + .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; if let Ok(response) = response_rx.recv() { // TODO: call exporter's force_flush method. if response { Ok(()) } else { - Err(MetricError::Other("Failed to flush".into())) + Err(OTelSdkError::InternalFailure("Failed to flush".into())) } } else { - Err(MetricError::Other("Failed to flush".into())) + Err(OTelSdkError::InternalFailure("Failed to flush".into())) } } @@ -515,7 +515,7 @@ impl MetricReader for PeriodicReader { self.inner.collect(rm) } - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { self.inner.force_flush() } @@ -546,7 +546,7 @@ mod tests { error::{OTelSdkError, OTelSdkResult}, metrics::{ data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, - InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality, + InMemoryMetricExporter, SdkMeterProvider, Temporality, }, Resource, }; @@ -592,7 +592,7 @@ mod tests { } } - async fn force_flush(&self) -> MetricResult<()> { + async fn force_flush(&self) -> OTelSdkResult { Ok(()) } @@ -616,7 +616,7 @@ mod tests { Ok(()) } - async fn force_flush(&self) -> MetricResult<()> { + async fn force_flush(&self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 7b7b5eaaaf..9315d38b91 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -216,7 +216,7 @@ struct PeriodicReaderInner { #[derive(Debug)] enum Message { Export, - Flush(oneshot::Sender>), + Flush(oneshot::Sender), Shutdown(oneshot::Sender), } @@ -282,10 +282,7 @@ impl PeriodicReaderWorker { name: "PeriodicReader.ForceFlushCalled", message = "Flush message received.", ); - let res = self - .collect_and_export() - .await - .map_err(|e| MetricError::Other(e.to_string())); + let res = self.collect_and_export().await; if let Err(send_error) = ch.send(res) { otel_debug!( name: "PeriodicReader.Flush.SendResultError", @@ -365,21 +362,24 @@ impl MetricReader for PeriodicReader { Ok(()) } - fn force_flush(&self) -> MetricResult<()> { - let mut inner = self.inner.lock()?; + fn force_flush(&self) -> OTelSdkResult { + let mut inner = self + .inner + .lock() + .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; if inner.is_shutdown { - return Err(MetricError::Other("reader is shut down".into())); + return Err(OTelSdkError::AlreadyShutdown); } let (sender, receiver) = oneshot::channel(); inner .message_sender .try_send(Message::Flush(sender)) - .map_err(|e| MetricError::Other(e.to_string()))?; + .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; drop(inner); // don't hold lock when blocking on future futures_executor::block_on(receiver) - .map_err(|err| MetricError::Other(err.to_string())) + .map_err(|err| OTelSdkError::InternalFailure(err.to_string())) .and_then(|res| res) } diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 9d1e8c16a9..0c117c8deb 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -8,7 +8,7 @@ use std::{ use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; use crate::{ - error::OTelSdkResult, + error::{OTelSdkError, OTelSdkResult}, metrics::{ aggregation, data::{Metric, ResourceMetrics, ScopeMetrics}, @@ -90,7 +90,7 @@ impl Pipeline { } /// Send accumulated telemetry - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { self.reader.force_flush() } @@ -634,7 +634,7 @@ impl Pipelines { } /// Force flush all pipelines - pub(crate) fn force_flush(&self) -> MetricResult<()> { + pub(crate) fn force_flush(&self) -> OTelSdkResult { let mut errs = vec![]; for pipeline in &self.0 { if let Err(err) = pipeline.force_flush() { @@ -645,7 +645,7 @@ impl Pipelines { if errs.is_empty() { Ok(()) } else { - Err(MetricError::Other(format!("{errs:?}"))) + Err(OTelSdkError::InternalFailure(format!("{errs:?}"))) } } diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index 49957169c9..04710bdd41 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -36,7 +36,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static { /// /// There is no guaranteed that all telemetry be flushed or all resources have /// been released on error. - fn force_flush(&self) -> MetricResult<()>; + fn force_flush(&self) -> OTelSdkResult; /// Flushes all metric measurements held in an export pipeline and releases any /// held computational resources. diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index cbea10ce56..041eebfddb 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -38,7 +38,7 @@ impl MetricReader for TestMetricReader { Ok(()) } - fn force_flush(&self) -> MetricResult<()> { + fn force_flush(&self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index ab4abe4e20..005be41d61 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::{f64, fmt}; -use opentelemetry_sdk::metrics::{MetricResult, Temporality}; +use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::{ error::OTelSdkResult, metrics::{ @@ -60,7 +60,7 @@ impl PushMetricExporter for MetricExporter { } } - async fn force_flush(&self) -> MetricResult<()> { + async fn force_flush(&self) -> OTelSdkResult { // exporter holds no state, nothing to flush Ok(()) }