diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 5b15df824e..f534b69a7f 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -1,35 +1,35 @@ use std::sync::Arc; +use crate::metric::MetricsClient; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; -use opentelemetry_sdk::metrics::{MetricError, MetricResult}; - -use crate::{metric::MetricsClient, Error}; use super::OtlpHttpClient; #[async_trait] impl MetricsClient for OtlpHttpClient { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { let client = self .client .lock() - .map_err(Into::into) + .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}"))) .and_then(|g| match &*g { Some(client) => Ok(Arc::clone(client)), - _ => Err(MetricError::Other("exporter is already shut down".into())), + _ => Err(OTelSdkError::AlreadyShutdown), })?; - let (body, content_type) = self.build_metrics_export_body(metrics)?; + let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| { + OTelSdkError::InternalFailure(format!("Failed to serialize metrics: {e:?}")) + })?; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) .header(CONTENT_TYPE, content_type) .body(body.into()) - .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; + .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; for (k, v) in &self.headers { request.headers_mut().insert(k.clone(), v.clone()); @@ -39,7 +39,7 @@ impl MetricsClient for OtlpHttpClient { client .send_bytes(request) .await - .map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?; + .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index 403f186b5f..c101829a34 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -8,7 +8,6 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{ }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; -use opentelemetry_sdk::metrics::{MetricError, MetricResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -55,26 +54,28 @@ impl TonicMetricsClient { #[async_trait] impl MetricsClient for TonicMetricsClient { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { - let (mut client, metadata, extensions) = - self.inner - .lock() - .map_err(Into::into) - .and_then(|mut inner| match &mut *inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .call(Request::new(())) - .map_err(|e| { - MetricError::Other(format!( - "unexpected status while exporting {e:?}" - )) - })? - .into_parts(); - Ok((inner.client.clone(), m, e)) - } - None => Err(MetricError::Other("exporter is already shut down".into())), - })?; + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { + let (mut client, metadata, extensions) = self + .inner + .lock() + .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}"))) + .and_then(|mut inner| match &mut *inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .call(Request::new(())) + .map_err(|e| { + OTelSdkError::InternalFailure(format!( + "unexpected status while exporting {e:?}" + )) + })? + .into_parts(); + Ok((inner.client.clone(), m, e)) + } + None => Err(OTelSdkError::InternalFailure( + "exporter is already shut down".into(), + )), + })?; otel_debug!(name: "TonicsMetricsClient.CallingExport"); @@ -85,7 +86,7 @@ impl MetricsClient for TonicMetricsClient { ExportMetricsServiceRequest::from(&*metrics), )) .await - .map_err(crate::Error::from)?; + .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; Ok(()) } diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index ab990690ba..7ce0c02802 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -123,7 +123,7 @@ impl HasHttpConfig for MetricExporterBuilder { /// An interface for OTLP metrics clients #[async_trait] pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>; + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult; fn shutdown(&self) -> OTelSdkResult; } @@ -141,7 +141,7 @@ impl Debug for MetricExporter { #[async_trait] impl PushMetricExporter for MetricExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { self.client.export(metrics).await } diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index e3a9b92682..9e7425b47d 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -17,9 +17,8 @@ pub trait PushMetricExporter: Send + Sync + 'static { /// /// All retry logic must be contained in this function. The SDK does not /// implement any retry logic. All errors returned by this function are - /// considered unrecoverable and will be reported to a configured error - /// Handler. - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>; + /// considered unrecoverable and will be logged. + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult; /// Flushes any metric data held by an exporter. async fn force_flush(&self) -> MetricResult<()>; diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index 203b1eb27e..c97add60eb 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::error::OTelSdkResult; +use crate::error::{OTelSdkError, OTelSdkResult}; use crate::metrics::data::{self, Gauge, Sum}; use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics}; use crate::metrics::exporter::PushMetricExporter; @@ -265,13 +265,13 @@ impl InMemoryMetricExporter { #[async_trait] impl PushMetricExporter for InMemoryMetricExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { self.metrics .lock() .map(|mut metrics_guard| { metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics)) }) - .map_err(MetricError::from) + .map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string())) } async fn force_flush(&self) -> MetricResult<()> { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index b721bd677f..9aeb3cab34 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -357,7 +357,7 @@ impl PeriodicReader { reader } - fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> { + fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult { self.inner.collect_and_export(timeout) } } @@ -402,7 +402,7 @@ impl PeriodicReaderInner { } } - fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> { + fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult { // TODO: Reuse the internal vectors. Or refactor to avoid needing any // owned data structures to be passed to exporters. let mut rm = ResourceMetrics { @@ -425,7 +425,7 @@ impl PeriodicReaderInner { name: "PeriodReaderCollectError", error = format!("{:?}", e) ); - return Err(e); + return Err(OTelSdkError::InternalFailure(e.to_string())); } if rm.scope_metrics.is_empty() { @@ -546,7 +546,7 @@ mod tests { error::{OTelSdkError, OTelSdkResult}, metrics::{ data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, - InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality, + InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality, }, Resource, }; @@ -584,9 +584,9 @@ mod tests { #[async_trait] impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { - async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult { if self.count.fetch_add(1, Ordering::Relaxed) == 0 { - Err(MetricError::Other("export failed".into())) + Err(OTelSdkError::InternalFailure("export failed".into())) } else { Ok(()) } @@ -612,7 +612,7 @@ mod tests { #[async_trait] impl PushMetricExporter for MockMetricExporter { - async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 3ebe28435b..7b7b5eaaaf 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -233,8 +233,10 @@ struct PeriodicReaderWorker { } impl PeriodicReaderWorker { - async fn collect_and_export(&mut self) -> MetricResult<()> { - self.reader.collect(&mut self.rm)?; + async fn collect_and_export(&mut self) -> OTelSdkResult { + self.reader + .collect(&mut self.rm) + .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; if self.rm.scope_metrics.is_empty() { otel_debug!( name: "PeriodicReaderWorker.NoMetricsToExport", @@ -257,7 +259,7 @@ impl PeriodicReaderWorker { Either::Left((res, _)) => { res // return the status of export. } - Either::Right(_) => Err(MetricError::Other("export timed out".into())), + Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)), } } @@ -280,7 +282,10 @@ impl PeriodicReaderWorker { name: "PeriodicReader.ForceFlushCalled", message = "Flush message received.", ); - let res = self.collect_and_export().await; + let res = self + .collect_and_export() + .await + .map_err(|e| MetricError::Other(e.to_string())); if let Err(send_error) = ch.send(res) { otel_debug!( name: "PeriodicReader.Flush.SendResultError", diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 653d19fe3d..ab4abe4e20 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::{f64, fmt}; -use opentelemetry_sdk::metrics::{MetricError, MetricResult, Temporality}; +use opentelemetry_sdk::metrics::{MetricResult, Temporality}; use opentelemetry_sdk::{ error::OTelSdkResult, metrics::{ @@ -42,9 +42,9 @@ impl fmt::Debug for MetricExporter { #[async_trait] impl PushMetricExporter for MetricExporter { /// Write Metrics to stdout - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult { if self.is_shutdown.load(atomic::Ordering::SeqCst) { - Err(MetricError::Other("exporter is shut down".into())) + Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown) } else { println!("Metrics"); println!("Resource");