Skip to content

Commit 4cdc64f

Browse files
authored
Metric export to use common OTelSdkResult (#2604)
1 parent f5b44a5 commit 4cdc64f

File tree

8 files changed

+58
-53
lines changed

8 files changed

+58
-53
lines changed

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
11
use std::sync::Arc;
22

3+
use crate::metric::MetricsClient;
34
use async_trait::async_trait;
45
use http::{header::CONTENT_TYPE, Method};
56
use opentelemetry::otel_debug;
67
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
78
use opentelemetry_sdk::metrics::data::ResourceMetrics;
8-
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
9-
10-
use crate::{metric::MetricsClient, Error};
119

1210
use super::OtlpHttpClient;
1311

1412
#[async_trait]
1513
impl MetricsClient for OtlpHttpClient {
16-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
14+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
1715
let client = self
1816
.client
1917
.lock()
20-
.map_err(Into::into)
18+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
2119
.and_then(|g| match &*g {
2220
Some(client) => Ok(Arc::clone(client)),
23-
_ => Err(MetricError::Other("exporter is already shut down".into())),
21+
_ => Err(OTelSdkError::AlreadyShutdown),
2422
})?;
2523

26-
let (body, content_type) = self.build_metrics_export_body(metrics)?;
24+
let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| {
25+
OTelSdkError::InternalFailure(format!("Failed to serialize metrics: {e:?}"))
26+
})?;
2727
let mut request = http::Request::builder()
2828
.method(Method::POST)
2929
.uri(&self.collector_endpoint)
3030
.header(CONTENT_TYPE, content_type)
3131
.body(body.into())
32-
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
32+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
3333

3434
for (k, v) in &self.headers {
3535
request.headers_mut().insert(k.clone(), v.clone());
@@ -39,7 +39,7 @@ impl MetricsClient for OtlpHttpClient {
3939
client
4040
.send_bytes(request)
4141
.await
42-
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;
42+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
4343

4444
Ok(())
4545
}

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
88
};
99
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
1010
use opentelemetry_sdk::metrics::data::ResourceMetrics;
11-
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
1211
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1312

1413
use super::BoxInterceptor;
@@ -55,26 +54,28 @@ impl TonicMetricsClient {
5554

5655
#[async_trait]
5756
impl MetricsClient for TonicMetricsClient {
58-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
59-
let (mut client, metadata, extensions) =
60-
self.inner
61-
.lock()
62-
.map_err(Into::into)
63-
.and_then(|mut inner| match &mut *inner {
64-
Some(inner) => {
65-
let (m, e, _) = inner
66-
.interceptor
67-
.call(Request::new(()))
68-
.map_err(|e| {
69-
MetricError::Other(format!(
70-
"unexpected status while exporting {e:?}"
71-
))
72-
})?
73-
.into_parts();
74-
Ok((inner.client.clone(), m, e))
75-
}
76-
None => Err(MetricError::Other("exporter is already shut down".into())),
77-
})?;
57+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
58+
let (mut client, metadata, extensions) = self
59+
.inner
60+
.lock()
61+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
62+
.and_then(|mut inner| match &mut *inner {
63+
Some(inner) => {
64+
let (m, e, _) = inner
65+
.interceptor
66+
.call(Request::new(()))
67+
.map_err(|e| {
68+
OTelSdkError::InternalFailure(format!(
69+
"unexpected status while exporting {e:?}"
70+
))
71+
})?
72+
.into_parts();
73+
Ok((inner.client.clone(), m, e))
74+
}
75+
None => Err(OTelSdkError::InternalFailure(
76+
"exporter is already shut down".into(),
77+
)),
78+
})?;
7879

7980
otel_debug!(name: "TonicsMetricsClient.CallingExport");
8081

@@ -85,7 +86,7 @@ impl MetricsClient for TonicMetricsClient {
8586
ExportMetricsServiceRequest::from(&*metrics),
8687
))
8788
.await
88-
.map_err(crate::Error::from)?;
89+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
8990

9091
Ok(())
9192
}

opentelemetry-otlp/src/metric.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
123123
/// An interface for OTLP metrics clients
124124
#[async_trait]
125125
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
126-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
126+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
127127
fn shutdown(&self) -> OTelSdkResult;
128128
}
129129

@@ -141,7 +141,7 @@ impl Debug for MetricExporter {
141141

142142
#[async_trait]
143143
impl PushMetricExporter for MetricExporter {
144-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
144+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
145145
self.client.export(metrics).await
146146
}
147147

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ pub trait PushMetricExporter: Send + Sync + 'static {
1717
///
1818
/// All retry logic must be contained in this function. The SDK does not
1919
/// implement any retry logic. All errors returned by this function are
20-
/// considered unrecoverable and will be reported to a configured error
21-
/// Handler.
22-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
20+
/// considered unrecoverable and will be logged.
21+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
2322

2423
/// Flushes any metric data held by an exporter.
2524
async fn force_flush(&self) -> MetricResult<()>;

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::error::OTelSdkResult;
1+
use crate::error::{OTelSdkError, OTelSdkResult};
22
use crate::metrics::data::{self, Gauge, Sum};
33
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
44
use crate::metrics::exporter::PushMetricExporter;
@@ -265,13 +265,13 @@ impl InMemoryMetricExporter {
265265

266266
#[async_trait]
267267
impl PushMetricExporter for InMemoryMetricExporter {
268-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
268+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
269269
self.metrics
270270
.lock()
271271
.map(|mut metrics_guard| {
272272
metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
273273
})
274-
.map_err(MetricError::from)
274+
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
275275
}
276276

277277
async fn force_flush(&self) -> MetricResult<()> {

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ impl PeriodicReader {
357357
reader
358358
}
359359

360-
fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
360+
fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult {
361361
self.inner.collect_and_export(timeout)
362362
}
363363
}
@@ -402,7 +402,7 @@ impl PeriodicReaderInner {
402402
}
403403
}
404404

405-
fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
405+
fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult {
406406
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
407407
// owned data structures to be passed to exporters.
408408
let mut rm = ResourceMetrics {
@@ -425,7 +425,7 @@ impl PeriodicReaderInner {
425425
name: "PeriodReaderCollectError",
426426
error = format!("{:?}", e)
427427
);
428-
return Err(e);
428+
return Err(OTelSdkError::InternalFailure(e.to_string()));
429429
}
430430

431431
if rm.scope_metrics.is_empty() {
@@ -546,7 +546,7 @@ mod tests {
546546
error::{OTelSdkError, OTelSdkResult},
547547
metrics::{
548548
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
549-
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
549+
InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality,
550550
},
551551
Resource,
552552
};
@@ -584,9 +584,9 @@ mod tests {
584584

585585
#[async_trait]
586586
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
587-
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
587+
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
588588
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
589-
Err(MetricError::Other("export failed".into()))
589+
Err(OTelSdkError::InternalFailure("export failed".into()))
590590
} else {
591591
Ok(())
592592
}
@@ -612,7 +612,7 @@ mod tests {
612612

613613
#[async_trait]
614614
impl PushMetricExporter for MockMetricExporter {
615-
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
615+
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
616616
Ok(())
617617
}
618618

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,10 @@ struct PeriodicReaderWorker<RT: Runtime> {
233233
}
234234

235235
impl<RT: Runtime> PeriodicReaderWorker<RT> {
236-
async fn collect_and_export(&mut self) -> MetricResult<()> {
237-
self.reader.collect(&mut self.rm)?;
236+
async fn collect_and_export(&mut self) -> OTelSdkResult {
237+
self.reader
238+
.collect(&mut self.rm)
239+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
238240
if self.rm.scope_metrics.is_empty() {
239241
otel_debug!(
240242
name: "PeriodicReaderWorker.NoMetricsToExport",
@@ -257,7 +259,7 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
257259
Either::Left((res, _)) => {
258260
res // return the status of export.
259261
}
260-
Either::Right(_) => Err(MetricError::Other("export timed out".into())),
262+
Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)),
261263
}
262264
}
263265

@@ -280,7 +282,10 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
280282
name: "PeriodicReader.ForceFlushCalled",
281283
message = "Flush message received.",
282284
);
283-
let res = self.collect_and_export().await;
285+
let res = self
286+
.collect_and_export()
287+
.await
288+
.map_err(|e| MetricError::Other(e.to_string()));
284289
if let Err(send_error) = ch.send(res) {
285290
otel_debug!(
286291
name: "PeriodicReader.Flush.SendResultError",

opentelemetry-stdout/src/metrics/exporter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use async_trait::async_trait;
22
use chrono::{DateTime, Utc};
33
use core::{f64, fmt};
4-
use opentelemetry_sdk::metrics::{MetricError, MetricResult, Temporality};
4+
use opentelemetry_sdk::metrics::{MetricResult, Temporality};
55
use opentelemetry_sdk::{
66
error::OTelSdkResult,
77
metrics::{
@@ -42,9 +42,9 @@ impl fmt::Debug for MetricExporter {
4242
#[async_trait]
4343
impl PushMetricExporter for MetricExporter {
4444
/// Write Metrics to stdout
45-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
45+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
4646
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
47-
Err(MetricError::Other("exporter is shut down".into()))
47+
Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown)
4848
} else {
4949
println!("Metrics");
5050
println!("Resource");

0 commit comments

Comments
 (0)