Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
use std::sync::Arc;

use crate::metric::MetricsClient;
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

use crate::{metric::MetricsClient, Error};

use super::OtlpHttpClient;

#[async_trait]
impl MetricsClient for OtlpHttpClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult {
let client = self
.client
.lock()
.map_err(Into::into)
.map_err(|e| {
ExportErrorMetric::InternalFailure(format!("Failed to acquire lock: {e:?}"))
})
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(MetricError::Other("exporter is already shut down".into())),
_ => Err(ExportErrorMetric::InternalFailure(
"exporter is already shut down".into(),
)),
})?;

let (body, content_type) = self.build_metrics_export_body(metrics)?;
let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| {
ExportErrorMetric::InternalFailure(format!("Failed to serialize metrics: {e:?}"))
})?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body.into())
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
.map_err(|e| ExportErrorMetric::InternalFailure(format!("{e:?}")))?;

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
Expand All @@ -39,7 +43,7 @@ impl MetricsClient for OtlpHttpClient {
client
.send_bytes(request)
.await
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;
.map_err(|e| ExportErrorMetric::InternalFailure(format!("{e:?}")))?;

Ok(())
}
Expand Down
49 changes: 26 additions & 23 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use opentelemetry::otel_debug;
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use super::BoxInterceptor;
Expand Down Expand Up @@ -55,26 +54,30 @@ impl TonicMetricsClient {

#[async_trait]
impl MetricsClient for TonicMetricsClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
let (mut client, metadata, extensions) =
self.inner
.lock()
.map_err(Into::into)
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
MetricError::Other(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))
}
None => Err(MetricError::Other("exporter is already shut down".into())),
})?;
async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult {
let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| {
ExportErrorMetric::InternalFailure(format!("Failed to acquire lock: {e:?}"))
})
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
ExportErrorMetric::InternalFailure(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))
}
None => Err(ExportErrorMetric::InternalFailure(
"Client not found.".into(),
)),
})?;

otel_debug!(name: "TonicsMetricsClient.CallingExport");

Expand All @@ -85,7 +88,7 @@ impl MetricsClient for TonicMetricsClient {
ExportMetricsServiceRequest::from(&*metrics),
))
.await
.map_err(crate::Error::from)?;
.map_err(|e| ExportErrorMetric::InternalFailure(format!("{e:?}")))?;

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::NoExporterBuilderSet;

use async_trait::async_trait;
use core::fmt;
use opentelemetry_sdk::error::ShutdownResult;
use opentelemetry_sdk::error::{ExportResult, ShutdownResult};
use opentelemetry_sdk::metrics::MetricResult;

use opentelemetry_sdk::metrics::{
Expand Down Expand Up @@ -123,7 +123,7 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
/// An interface for OTLP metrics clients
#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult;
fn shutdown(&self) -> ShutdownResult;
}

Expand All @@ -141,7 +141,7 @@ impl Debug for MetricExporter {

#[async_trait]
impl PushMetricExporter for MetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult {
self.client.export(metrics).await
}

Expand Down
25 changes: 25 additions & 0 deletions opentelemetry-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,30 @@ pub enum ShutdownError {
InternalFailure(String),
}

#[derive(Error, Debug)]
/// Errors that can occur during export.
/// TODO: This should be ExportError, but that can be done after rest of repo changes are done.
pub enum ExportErrorMetric {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking more, this enum would also need AlreadyShutDown variant. Which means we should be able to re-use the same enum for Export,Shutdown,ForceFlush.

@scottgerring @lalitb Let me know if you agree. I'll make a small PR, just adding the common enum first, and then divide&conquer using it in all places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a small PR, just adding the common enum first, and then divide&conquer using it in all places.

Looks good to me.

/// Export timed out before completing.
///
/// This does not necessarily indicate a failure—export may still be
/// complete. If this occurs frequently, consider increasing the timeout
/// duration to allow more time for completion.
#[error("Export timed out after {0:?}")]
Timeout(Duration),

/// Export failed due to an internal error.
///
/// The error message is intended for logging purposes only and should not
/// be used to make programmatic decisions. It is implementation-specific
/// and subject to change without notice. Consumers of this error should not
/// rely on its content beyond logging.
#[error("Export failed: {0}")]
InternalFailure(String),
}

/// A specialized `Result` type for Shutdown operations.
pub type ShutdownResult = Result<(), ShutdownError>;

/// A specialized `Result` type for Export operations.
pub type ExportResult = Result<(), ExportErrorMetric>;
7 changes: 3 additions & 4 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use crate::error::ShutdownResult;
use crate::error::{ExportResult, ShutdownResult};
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;
Expand All @@ -17,9 +17,8 @@ pub trait PushMetricExporter: Send + Sync + 'static {
///
/// All retry logic must be contained in this function. The SDK does not
/// implement any retry logic. All errors returned by this function are
/// considered unrecoverable and will be reported to a configured error
/// Handler.
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
/// considered unrecoverable and will be logged.
async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> MetricResult<()>;
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::ShutdownResult;
use crate::error::{ExportErrorMetric, ExportResult, ShutdownResult};
use crate::metrics::data::{self, Gauge, Sum};
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
use crate::metrics::exporter::PushMetricExporter;
Expand Down Expand Up @@ -265,13 +265,13 @@ impl InMemoryMetricExporter {

#[async_trait]
impl PushMetricExporter for InMemoryMetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> ExportResult {
self.metrics
.lock()
.map(|mut metrics_guard| {
metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
})
.map_err(MetricError::from)
.map_err(|_| ExportErrorMetric::InternalFailure("Failed to lock metrics".to_string()))
}

async fn force_flush(&self) -> MetricResult<()> {
Expand Down
20 changes: 10 additions & 10 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};

use crate::{
error::{ShutdownError, ShutdownResult},
error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -307,7 +307,7 @@ impl PeriodicReader {
reader
}

fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
fn collect_and_export(&self, timeout: Duration) -> ExportResult {
self.inner.collect_and_export(timeout)
}
}
Expand Down Expand Up @@ -352,7 +352,7 @@ impl PeriodicReaderInner {
}
}

fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
fn collect_and_export(&self, timeout: Duration) -> ExportResult {
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
// owned data structures to be passed to exporters.
let mut rm = ResourceMetrics {
Expand All @@ -375,7 +375,7 @@ impl PeriodicReaderInner {
name: "PeriodReaderCollectError",
error = format!("{:?}", e)
);
return Err(e);
return Err(ExportErrorMetric::InternalFailure(e.to_string()));
}

if rm.scope_metrics.is_empty() {
Expand All @@ -397,7 +397,7 @@ impl PeriodicReaderInner {
name: "PeriodReaderExportError",
error = format!("{:?}", e)
);
return Err(e);
return Err(ExportErrorMetric::InternalFailure(e.to_string()));
}

Ok(())
Expand Down Expand Up @@ -503,10 +503,10 @@ impl MetricReader for PeriodicReader {
mod tests {
use super::PeriodicReader;
use crate::{
error::{ShutdownError, ShutdownResult},
error::{ExportErrorMetric, ExportResult, ShutdownError, ShutdownResult},
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -544,9 +544,9 @@ mod tests {

#[async_trait]
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, _metrics: &mut ResourceMetrics) -> ExportResult {
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
Err(MetricError::Other("export failed".into()))
Err(ExportErrorMetric::InternalFailure("export failed".into()))
} else {
Ok(())
}
Expand All @@ -572,7 +572,7 @@ mod tests {

#[async_trait]
impl PushMetricExporter for MockMetricExporter {
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, _metrics: &mut ResourceMetrics) -> ExportResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use futures_util::{
};
use opentelemetry::{otel_debug, otel_error};

use crate::runtime::Runtime;
use crate::{error::ExportErrorMetric, runtime::Runtime};
use crate::{
error::{ShutdownError, ShutdownResult},
error::{ExportResult, ShutdownError, ShutdownResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -233,8 +233,10 @@ struct PeriodicReaderWorker<RT: Runtime> {
}

impl<RT: Runtime> PeriodicReaderWorker<RT> {
async fn collect_and_export(&mut self) -> MetricResult<()> {
self.reader.collect(&mut self.rm)?;
async fn collect_and_export(&mut self) -> ExportResult {
self.reader
.collect(&mut self.rm)
.map_err(|e| ExportErrorMetric::InternalFailure(e.to_string()))?;
if self.rm.scope_metrics.is_empty() {
otel_debug!(
name: "PeriodicReaderWorker.NoMetricsToExport",
Expand All @@ -257,7 +259,7 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
Either::Left((res, _)) => {
res // return the status of export.
}
Either::Right(_) => Err(MetricError::Other("export timed out".into())),
Either::Right(_) => Err(ExportErrorMetric::Timeout(self.timeout)),
}
}

Expand All @@ -280,7 +282,10 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
name: "PeriodicReader.ForceFlushCalled",
message = "Flush message received.",
);
let res = self.collect_and_export().await;
let res = self
.collect_and_export()
.await
.map_err(|e| MetricError::Other(e.to_string()));
if let Err(send_error) = ch.send(res) {
otel_debug!(
name: "PeriodicReader.Flush.SendResultError",
Expand Down
Loading