diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 9ac0ba5455..eb375ccbb8 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -1,35 +1,39 @@ use std::sync::Arc; +use crate::metric::MetricsClient; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; -use opentelemetry_sdk::error::{ShutdownError, ShutdownResult}; +use opentelemetry_sdk::error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; -use opentelemetry_sdk::metrics::{MetricError, MetricResult}; - -use crate::{metric::MetricsClient, Error}; use super::OtlpHttpClient; #[async_trait] impl MetricsClient for OtlpHttpClient { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult { let client = self .client .lock() - .map_err(Into::into) + .map_err(|e| { + ExportErrorMetric::InternalFailure(format!("Failed to acquire lock: {e:?}")) + }) .and_then(|g| match &*g { Some(client) => Ok(Arc::clone(client)), - _ => Err(MetricError::Other("exporter is already shut down".into())), + _ => Err(ExportErrorMetric::InternalFailure( + "exporter is already shut down".into(), + )), })?; - let (body, content_type) = self.build_metrics_export_body(metrics)?; + let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| { + ExportErrorMetric::InternalFailure(format!("Failed to serialize metrics: {e:?}")) + })?; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) .header(CONTENT_TYPE, content_type) .body(body.into()) - .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; + .map_err(|e| ExportErrorMetric::InternalFailure(format!("{e:?}")))?; for (k, v) in &self.headers { request.headers_mut().insert(k.clone(), v.clone()); @@ -39,7 +43,7 @@ impl MetricsClient for OtlpHttpClient { client .send_bytes(request) .await - .map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?; + .map_err(|e| ExportErrorMetric::InternalFailure(format!("{e:?}")))?; Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index a413839a65..0f66bfc0a6 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -6,9 +6,8 @@ use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, }; -use opentelemetry_sdk::error::{ShutdownError, ShutdownResult}; +use opentelemetry_sdk::error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; -use opentelemetry_sdk::metrics::{MetricError, MetricResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -55,26 +54,30 @@ impl TonicMetricsClient { #[async_trait] impl MetricsClient for TonicMetricsClient { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { - let (mut client, metadata, extensions) = - self.inner - .lock() - .map_err(Into::into) - .and_then(|mut inner| match &mut *inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .call(Request::new(())) - .map_err(|e| { - MetricError::Other(format!( - "unexpected status while exporting {e:?}" - )) - })? - .into_parts(); - Ok((inner.client.clone(), m, e)) - } - None => Err(MetricError::Other("exporter is already shut down".into())), - })?; + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult { + let (mut client, metadata, extensions) = self + .inner + .lock() + .map_err(|e| { + ExportErrorMetric::InternalFailure(format!("Failed to acquire lock: {e:?}")) + }) + .and_then(|mut inner| match &mut *inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .call(Request::new(())) + .map_err(|e| { + ExportErrorMetric::InternalFailure(format!( + "unexpected status while exporting {e:?}" + )) + })? + .into_parts(); + Ok((inner.client.clone(), m, e)) + } + None => Err(ExportErrorMetric::InternalFailure( + "Client not found.".into(), + )), + })?; otel_debug!(name: "TonicsMetricsClient.CallingExport"); @@ -85,7 +88,7 @@ impl MetricsClient for TonicMetricsClient { ExportMetricsServiceRequest::from(&*metrics), )) .await - .map_err(crate::Error::from)?; + .map_err(|e| ExportErrorMetric::InternalFailure(format!("{e:?}")))?; Ok(()) } diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index 08642a0dc5..bbeb4912e7 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -16,7 +16,7 @@ use crate::NoExporterBuilderSet; use async_trait::async_trait; use core::fmt; -use opentelemetry_sdk::error::ShutdownResult; +use opentelemetry_sdk::error::{ExportResult, ShutdownResult}; use opentelemetry_sdk::metrics::MetricResult; use opentelemetry_sdk::metrics::{ @@ -123,7 +123,7 @@ impl HasHttpConfig for MetricExporterBuilder { /// An interface for OTLP metrics clients #[async_trait] pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>; + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult; fn shutdown(&self) -> ShutdownResult; } @@ -141,7 +141,7 @@ impl Debug for MetricExporter { #[async_trait] impl PushMetricExporter for MetricExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult { self.client.export(metrics).await } diff --git a/opentelemetry-sdk/src/error.rs b/opentelemetry-sdk/src/error.rs index 2ba0df75a5..1813f7456b 100644 --- a/opentelemetry-sdk/src/error.rs +++ b/opentelemetry-sdk/src/error.rs @@ -41,5 +41,30 @@ pub enum ShutdownError { InternalFailure(String), } +#[derive(Error, Debug)] +/// Errors that can occur during export. +/// TODO: This should be ExportError, but that can be done after rest of repo changes are done. +pub enum ExportErrorMetric { + /// Export timed out before completing. + /// + /// This does not necessarily indicate a failure—export may still be + /// complete. If this occurs frequently, consider increasing the timeout + /// duration to allow more time for completion. + #[error("Export timed out after {0:?}")] + Timeout(Duration), + + /// Export failed due to an internal error. + /// + /// The error message is intended for logging purposes only and should not + /// be used to make programmatic decisions. It is implementation-specific + /// and subject to change without notice. Consumers of this error should not + /// rely on its content beyond logging. + #[error("Export failed: {0}")] + InternalFailure(String), +} + /// A specialized `Result` type for Shutdown operations. pub type ShutdownResult = Result<(), ShutdownError>; + +/// A specialized `Result` type for Export operations. +pub type ExportResult = Result<(), ExportErrorMetric>; diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index 868930883e..3baafcae95 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -1,7 +1,7 @@ //! Interfaces for exporting metrics use async_trait::async_trait; -use crate::error::ShutdownResult; +use crate::error::{ExportResult, ShutdownResult}; use crate::metrics::MetricResult; use crate::metrics::data::ResourceMetrics; @@ -17,9 +17,8 @@ pub trait PushMetricExporter: Send + Sync + 'static { /// /// All retry logic must be contained in this function. The SDK does not /// implement any retry logic. All errors returned by this function are - /// considered unrecoverable and will be reported to a configured error - /// Handler. - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>; + /// considered unrecoverable and will be logged. + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult; /// Flushes any metric data held by an exporter. async fn force_flush(&self) -> MetricResult<()>; diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index a87532b460..b9ed7e8de3 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::error::ShutdownResult; +use crate::error::{ExportErrorMetric, ExportResult, ShutdownResult}; use crate::metrics::data::{self, Gauge, Sum}; use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics}; use crate::metrics::exporter::PushMetricExporter; @@ -265,13 +265,13 @@ impl InMemoryMetricExporter { #[async_trait] impl PushMetricExporter for InMemoryMetricExporter { - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult { self.metrics .lock() .map(|mut metrics_guard| { metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics)) }) - .map_err(MetricError::from) + .map_err(|_| ExportErrorMetric::InternalFailure("Failed to lock metrics".to_string())) } async fn force_flush(&self) -> MetricResult<()> { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 84f565c58e..cac9d81512 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -11,7 +11,7 @@ use std::{ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn}; use crate::{ - error::{ShutdownError, ShutdownResult}, + error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, Resource, }; @@ -307,7 +307,7 @@ impl PeriodicReader { reader } - fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> { + fn collect_and_export(&self, timeout: Duration) -> ExportResult { self.inner.collect_and_export(timeout) } } @@ -352,7 +352,7 @@ impl PeriodicReaderInner { } } - fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> { + fn collect_and_export(&self, timeout: Duration) -> ExportResult { // TODO: Reuse the internal vectors. Or refactor to avoid needing any // owned data structures to be passed to exporters. let mut rm = ResourceMetrics { @@ -375,7 +375,7 @@ impl PeriodicReaderInner { name: "PeriodReaderCollectError", error = format!("{:?}", e) ); - return Err(e); + return Err(ExportErrorMetric::InternalFailure(e.to_string())); } if rm.scope_metrics.is_empty() { @@ -397,7 +397,7 @@ impl PeriodicReaderInner { name: "PeriodReaderExportError", error = format!("{:?}", e) ); - return Err(e); + return Err(ExportErrorMetric::InternalFailure(e.to_string())); } Ok(()) @@ -503,10 +503,10 @@ impl MetricReader for PeriodicReader { mod tests { use super::PeriodicReader; use crate::{ - error::{ShutdownError, ShutdownResult}, + error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult}, metrics::{ data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, - InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality, + InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality, }, Resource, }; @@ -544,9 +544,9 @@ mod tests { #[async_trait] impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst { - async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, _metrics: &mut ResourceMetrics) -> ExportResult { if self.count.fetch_add(1, Ordering::Relaxed) == 0 { - Err(MetricError::Other("export failed".into())) + Err(ExportErrorMetric::InternalFailure("export failed".into())) } else { Ok(()) } @@ -572,7 +572,7 @@ mod tests { #[async_trait] impl PushMetricExporter for MockMetricExporter { - async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> { + async fn export(&self, _metrics: &mut ResourceMetrics) -> ExportResult { Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index 8ea0897e18..f94b1fe743 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -13,9 +13,9 @@ use futures_util::{ }; use opentelemetry::{otel_debug, otel_error}; -use crate::runtime::Runtime; +use crate::{error::ExportErrorMetric, runtime::Runtime}; use crate::{ - error::{ShutdownError, ShutdownResult}, + error::{ExportResult, ShutdownError, ShutdownResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, Resource, }; @@ -233,8 +233,10 @@ struct PeriodicReaderWorker { } impl PeriodicReaderWorker { - async fn collect_and_export(&mut self) -> MetricResult<()> { - self.reader.collect(&mut self.rm)?; + async fn collect_and_export(&mut self) -> ExportResult { + self.reader + .collect(&mut self.rm) + .map_err(|e| ExportErrorMetric::InternalFailure(e.to_string()))?; if self.rm.scope_metrics.is_empty() { otel_debug!( name: "PeriodicReaderWorker.NoMetricsToExport", @@ -257,7 +259,7 @@ impl PeriodicReaderWorker { Either::Left((res, _)) => { res // return the status of export. } - Either::Right(_) => Err(MetricError::Other("export timed out".into())), + Either::Right(_) => Err(ExportErrorMetric::Timeout(self.timeout)), } } @@ -280,7 +282,10 @@ impl PeriodicReaderWorker { name: "PeriodicReader.ForceFlushCalled", message = "Flush message received.", ); - let res = self.collect_and_export().await; + let res = self + .collect_and_export() + .await + .map_err(|e| MetricError::Other(e.to_string())); if let Err(send_error) = ch.send(res) { otel_debug!( name: "PeriodicReader.Flush.SendResultError", diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 6117916b64..68c9068061 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::{f64, fmt}; -use opentelemetry_sdk::metrics::{MetricError, MetricResult, Temporality}; +use opentelemetry_sdk::error::ExportResult; +use opentelemetry_sdk::metrics::{MetricResult, Temporality}; use opentelemetry_sdk::{ error::ShutdownResult, metrics::{ @@ -13,11 +14,9 @@ use opentelemetry_sdk::{ }, }; use std::fmt::Debug; -use std::sync::atomic; /// An OpenTelemetry exporter that writes to stdout on export. pub struct MetricExporter { - is_shutdown: atomic::AtomicBool, temporality: Temporality, } @@ -42,22 +41,18 @@ impl fmt::Debug for MetricExporter { #[async_trait] impl PushMetricExporter for MetricExporter { /// Write Metrics to stdout - async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { - if self.is_shutdown.load(atomic::Ordering::SeqCst) { - Err(MetricError::Other("exporter is shut down".into())) - } else { - println!("Metrics"); - println!("Resource"); - if let Some(schema_url) = metrics.resource.schema_url() { - println!("\tResource SchemaUrl: {:?}", schema_url); - } - - metrics.resource.iter().for_each(|(k, v)| { - println!("\t -> {}={:?}", k, v); - }); - print_metrics(&metrics.scope_metrics); - Ok(()) + async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult { + println!("Metrics"); + println!("Resource"); + if let Some(schema_url) = metrics.resource.schema_url() { + println!("\tResource SchemaUrl: {:?}", schema_url); } + + metrics.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + print_metrics(&metrics.scope_metrics); + Ok(()) } async fn force_flush(&self) -> MetricResult<()> { @@ -66,7 +61,6 @@ impl PushMetricExporter for MetricExporter { } fn shutdown(&self) -> ShutdownResult { - self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) } @@ -271,7 +265,6 @@ impl MetricExporterBuilder { pub fn build(self) -> MetricExporter { MetricExporter { temporality: self.temporality.unwrap_or_default(), - is_shutdown: atomic::AtomicBool::new(false), } } }