Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
use std::sync::Arc;

use crate::metric::MetricsClient;
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

use crate::{metric::MetricsClient, Error};

use super::OtlpHttpClient;

#[async_trait]
impl MetricsClient for OtlpHttpClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {

Check warning on line 14 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L14

Added line #L14 was not covered by tests
let client = self
.client
.lock()
.map_err(Into::into)
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))

Check warning on line 18 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L18

Added line #L18 was not covered by tests
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(MetricError::Other("exporter is already shut down".into())),
_ => Err(OTelSdkError::AlreadyShutdown),

Check warning on line 21 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L21

Added line #L21 was not covered by tests
})?;

let (body, content_type) = self.build_metrics_export_body(metrics)?;
let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to serialize metrics: {e:?}"))
})?;

Check warning on line 26 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L24-L26

Added lines #L24 - L26 were not covered by tests
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body.into())
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;

Check warning on line 32 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L32

Added line #L32 was not covered by tests

for (k, v) in &self.headers {
request.headers_mut().insert(k.clone(), v.clone());
Expand All @@ -39,7 +39,7 @@
client
.send_bytes(request)
.await
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;

Check warning on line 42 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L42

Added line #L42 was not covered by tests

Ok(())
}
Expand Down
45 changes: 23 additions & 22 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use super::BoxInterceptor;
Expand Down Expand Up @@ -55,26 +54,28 @@

#[async_trait]
impl MetricsClient for TonicMetricsClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
let (mut client, metadata, extensions) =
self.inner
.lock()
.map_err(Into::into)
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
MetricError::Other(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))
}
None => Err(MetricError::Other("exporter is already shut down".into())),
})?;
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
OTelSdkError::InternalFailure(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))

Check warning on line 73 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L57-L73

Added lines #L57 - L73 were not covered by tests
}
None => Err(OTelSdkError::InternalFailure(
"exporter is already shut down".into(),
)),
})?;

Check warning on line 78 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L75-L78

Added lines #L75 - L78 were not covered by tests

otel_debug!(name: "TonicsMetricsClient.CallingExport");

Expand All @@ -85,7 +86,7 @@
ExportMetricsServiceRequest::from(&*metrics),
))
.await
.map_err(crate::Error::from)?;
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;

Check warning on line 89 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L89

Added line #L89 was not covered by tests

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
/// An interface for OTLP metrics clients
#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
fn shutdown(&self) -> OTelSdkResult;
}

Expand All @@ -141,7 +141,7 @@

#[async_trait]
impl PushMetricExporter for MetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {

Check warning on line 144 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L144

Added line #L144 was not covered by tests
self.client.export(metrics).await
}

Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ pub trait PushMetricExporter: Send + Sync + 'static {
///
/// All retry logic must be contained in this function. The SDK does not
/// implement any retry logic. All errors returned by this function are
/// considered unrecoverable and will be reported to a configured error
/// Handler.
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
/// considered unrecoverable and will be logged.
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> MetricResult<()>;
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::OTelSdkResult;
use crate::error::{OTelSdkError, OTelSdkResult};
use crate::metrics::data::{self, Gauge, Sum};
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
use crate::metrics::exporter::PushMetricExporter;
Expand Down Expand Up @@ -265,13 +265,13 @@ impl InMemoryMetricExporter {

#[async_trait]
impl PushMetricExporter for InMemoryMetricExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
self.metrics
.lock()
.map(|mut metrics_guard| {
metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
})
.map_err(MetricError::from)
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
}

async fn force_flush(&self) -> MetricResult<()> {
Expand Down
14 changes: 7 additions & 7 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl PeriodicReader {
reader
}

fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult {
self.inner.collect_and_export(timeout)
}
}
Expand Down Expand Up @@ -402,7 +402,7 @@ impl PeriodicReaderInner {
}
}

fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
fn collect_and_export(&self, timeout: Duration) -> OTelSdkResult {
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
// owned data structures to be passed to exporters.
let mut rm = ResourceMetrics {
Expand All @@ -425,7 +425,7 @@ impl PeriodicReaderInner {
name: "PeriodReaderCollectError",
error = format!("{:?}", e)
);
return Err(e);
return Err(OTelSdkError::InternalFailure(e.to_string()));
}

if rm.scope_metrics.is_empty() {
Expand Down Expand Up @@ -546,7 +546,7 @@ mod tests {
error::{OTelSdkError, OTelSdkResult},
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
InMemoryMetricExporter, MetricResult, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -584,9 +584,9 @@ mod tests {

#[async_trait]
impl PushMetricExporter for MetricExporterThatFailsOnlyOnFirst {
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
if self.count.fetch_add(1, Ordering::Relaxed) == 0 {
Err(MetricError::Other("export failed".into()))
Err(OTelSdkError::InternalFailure("export failed".into()))
} else {
Ok(())
}
Expand All @@ -612,7 +612,7 @@ mod tests {

#[async_trait]
impl PushMetricExporter for MockMetricExporter {
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, _metrics: &mut ResourceMetrics) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@
}

impl<RT: Runtime> PeriodicReaderWorker<RT> {
async fn collect_and_export(&mut self) -> MetricResult<()> {
self.reader.collect(&mut self.rm)?;
async fn collect_and_export(&mut self) -> OTelSdkResult {
self.reader
.collect(&mut self.rm)
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
if self.rm.scope_metrics.is_empty() {
otel_debug!(
name: "PeriodicReaderWorker.NoMetricsToExport",
Expand All @@ -257,7 +259,7 @@
Either::Left((res, _)) => {
res // return the status of export.
}
Either::Right(_) => Err(MetricError::Other("export timed out".into())),
Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)),

Check warning on line 262 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L262

Added line #L262 was not covered by tests
}
}

Expand All @@ -280,7 +282,10 @@
name: "PeriodicReader.ForceFlushCalled",
message = "Flush message received.",
);
let res = self.collect_and_export().await;
let res = self
.collect_and_export()
.await
.map_err(|e| MetricError::Other(e.to_string()));

Check warning on line 288 in opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs#L285-L288

Added lines #L285 - L288 were not covered by tests
if let Err(send_error) = ch.send(res) {
otel_debug!(
name: "PeriodicReader.Flush.SendResultError",
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-stdout/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use core::{f64, fmt};
use opentelemetry_sdk::metrics::{MetricError, MetricResult, Temporality};
use opentelemetry_sdk::metrics::{MetricResult, Temporality};
use opentelemetry_sdk::{
error::OTelSdkResult,
metrics::{
Expand Down Expand Up @@ -42,9 +42,9 @@
#[async_trait]
impl PushMetricExporter for MetricExporter {
/// Write Metrics to stdout
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {

Check warning on line 45 in opentelemetry-stdout/src/metrics/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/metrics/exporter.rs#L45

Added line #L45 was not covered by tests
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
Err(MetricError::Other("exporter is shut down".into()))
Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown)

Check warning on line 47 in opentelemetry-stdout/src/metrics/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/metrics/exporter.rs#L47

Added line #L47 was not covered by tests
} else {
println!("Metrics");
println!("Resource");
Expand Down