Skip to content

Commit 655b815

Browse files
authored
Merge branch 'main' into anujnegi/disable-name-check
2 parents 8cf3ac0 + 4cdc64f commit 655b815

File tree

17 files changed

+141
-124
lines changed

17 files changed

+141
-124
lines changed

examples/metrics-basic/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use opentelemetry::{global, KeyValue};
2-
use opentelemetry_sdk::error::ShutdownError;
2+
use opentelemetry_sdk::error::OTelSdkError;
33
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
44
use opentelemetry_sdk::Resource;
55
use std::error::Error;

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
11
use std::sync::Arc;
22

3+
use crate::metric::MetricsClient;
34
use async_trait::async_trait;
45
use http::{header::CONTENT_TYPE, Method};
56
use opentelemetry::otel_debug;
6-
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
7+
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
78
use opentelemetry_sdk::metrics::data::ResourceMetrics;
8-
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
9-
10-
use crate::{metric::MetricsClient, Error};
119

1210
use super::OtlpHttpClient;
1311

1412
#[async_trait]
1513
impl MetricsClient for OtlpHttpClient {
16-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
14+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
1715
let client = self
1816
.client
1917
.lock()
20-
.map_err(Into::into)
18+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
2119
.and_then(|g| match &*g {
2220
Some(client) => Ok(Arc::clone(client)),
23-
_ => Err(MetricError::Other("exporter is already shut down".into())),
21+
_ => Err(OTelSdkError::AlreadyShutdown),
2422
})?;
2523

26-
let (body, content_type) = self.build_metrics_export_body(metrics)?;
24+
let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| {
25+
OTelSdkError::InternalFailure(format!("Failed to serialize metrics: {e:?}"))
26+
})?;
2727
let mut request = http::Request::builder()
2828
.method(Method::POST)
2929
.uri(&self.collector_endpoint)
3030
.header(CONTENT_TYPE, content_type)
3131
.body(body.into())
32-
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
32+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
3333

3434
for (k, v) in &self.headers {
3535
request.headers_mut().insert(k.clone(), v.clone());
@@ -39,15 +39,15 @@ impl MetricsClient for OtlpHttpClient {
3939
client
4040
.send_bytes(request)
4141
.await
42-
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;
42+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
4343

4444
Ok(())
4545
}
4646

47-
fn shutdown(&self) -> ShutdownResult {
47+
fn shutdown(&self) -> OTelSdkResult {
4848
self.client
4949
.lock()
50-
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
50+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
5151
.take();
5252

5353
Ok(())

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use opentelemetry::otel_debug;
66
use opentelemetry_proto::tonic::collector::metrics::v1::{
77
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
88
};
9-
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
9+
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
1010
use opentelemetry_sdk::metrics::data::ResourceMetrics;
11-
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
1211
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1312

1413
use super::BoxInterceptor;
@@ -55,26 +54,28 @@ impl TonicMetricsClient {
5554

5655
#[async_trait]
5756
impl MetricsClient for TonicMetricsClient {
58-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
59-
let (mut client, metadata, extensions) =
60-
self.inner
61-
.lock()
62-
.map_err(Into::into)
63-
.and_then(|mut inner| match &mut *inner {
64-
Some(inner) => {
65-
let (m, e, _) = inner
66-
.interceptor
67-
.call(Request::new(()))
68-
.map_err(|e| {
69-
MetricError::Other(format!(
70-
"unexpected status while exporting {e:?}"
71-
))
72-
})?
73-
.into_parts();
74-
Ok((inner.client.clone(), m, e))
75-
}
76-
None => Err(MetricError::Other("exporter is already shut down".into())),
77-
})?;
57+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
58+
let (mut client, metadata, extensions) = self
59+
.inner
60+
.lock()
61+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
62+
.and_then(|mut inner| match &mut *inner {
63+
Some(inner) => {
64+
let (m, e, _) = inner
65+
.interceptor
66+
.call(Request::new(()))
67+
.map_err(|e| {
68+
OTelSdkError::InternalFailure(format!(
69+
"unexpected status while exporting {e:?}"
70+
))
71+
})?
72+
.into_parts();
73+
Ok((inner.client.clone(), m, e))
74+
}
75+
None => Err(OTelSdkError::InternalFailure(
76+
"exporter is already shut down".into(),
77+
)),
78+
})?;
7879

7980
otel_debug!(name: "TonicsMetricsClient.CallingExport");
8081

@@ -85,15 +86,15 @@ impl MetricsClient for TonicMetricsClient {
8586
ExportMetricsServiceRequest::from(&*metrics),
8687
))
8788
.await
88-
.map_err(crate::Error::from)?;
89+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
8990

9091
Ok(())
9192
}
9293

93-
fn shutdown(&self) -> ShutdownResult {
94+
fn shutdown(&self) -> OTelSdkResult {
9495
self.inner
9596
.lock()
96-
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
97+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
9798
.take();
9899

99100
Ok(())

opentelemetry-otlp/src/metric.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::NoExporterBuilderSet;
1616

1717
use async_trait::async_trait;
1818
use core::fmt;
19-
use opentelemetry_sdk::error::ShutdownResult;
19+
use opentelemetry_sdk::error::OTelSdkResult;
2020
use opentelemetry_sdk::metrics::MetricResult;
2121

2222
use opentelemetry_sdk::metrics::{
@@ -123,8 +123,8 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
123123
/// An interface for OTLP metrics clients
124124
#[async_trait]
125125
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
126-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
127-
fn shutdown(&self) -> ShutdownResult;
126+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
127+
fn shutdown(&self) -> OTelSdkResult;
128128
}
129129

130130
/// Export metrics in OTEL format.
@@ -141,7 +141,7 @@ impl Debug for MetricExporter {
141141

142142
#[async_trait]
143143
impl PushMetricExporter for MetricExporter {
144-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
144+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
145145
self.client.export(metrics).await
146146
}
147147

@@ -150,7 +150,7 @@ impl PushMetricExporter for MetricExporter {
150150
Ok(())
151151
}
152152

153-
fn shutdown(&self) -> ShutdownResult {
153+
fn shutdown(&self) -> OTelSdkResult {
154154
self.client.shutdown()
155155
}
156156

opentelemetry-otlp/tests/integration_test/tests/logs.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
4646
Ok(logger_provider)
4747
}
4848

49-
async fn logs_tokio_helper(is_simple: bool, log_send_outside_rt: bool) -> Result<()> {
49+
async fn logs_tokio_helper(
50+
is_simple: bool,
51+
log_send_outside_rt: bool,
52+
current_thread: bool,
53+
) -> Result<()> {
5054
use crate::{assert_logs_results_contains, init_logs};
5155
test_utils::start_collector_container().await?;
5256

@@ -76,7 +80,14 @@ async fn logs_tokio_helper(is_simple: bool, log_send_outside_rt: bool) -> Result
7680
info!(target: "my-target", uuid = expected_uuid.as_str(), "hello from {}. My price is {}.", "banana", 2.99);
7781
}
7882
}
79-
let _ = logger_provider.shutdown();
83+
if current_thread {
84+
let _res = tokio::runtime::Handle::current()
85+
.spawn_blocking(move || logger_provider.shutdown())
86+
.await
87+
.unwrap();
88+
} else {
89+
let _ = logger_provider.shutdown();
90+
}
8091
tokio::time::sleep(Duration::from_secs(5)).await;
8192
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
8293
Ok(())
@@ -175,7 +186,7 @@ mod logtests {
175186
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
176187
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
177188
pub async fn logs_batch_tokio_multi_thread() -> Result<()> {
178-
logs_tokio_helper(false, false).await
189+
logs_tokio_helper(false, false, false).await
179190
}
180191

181192
// logger initialization - Inside RT
@@ -185,7 +196,7 @@ mod logtests {
185196
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
186197
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
187198
pub async fn logs_batch_tokio_multi_with_one_worker() -> Result<()> {
188-
logs_tokio_helper(false, false).await
199+
logs_tokio_helper(false, false, false).await
189200
}
190201

191202
// logger initialization - Inside RT
@@ -195,7 +206,7 @@ mod logtests {
195206
#[tokio::test(flavor = "current_thread")]
196207
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
197208
pub async fn logs_batch_tokio_current() -> Result<()> {
198-
logs_tokio_helper(false, false).await
209+
logs_tokio_helper(false, false, true).await
199210
}
200211

201212
// logger initialization - Inside RT
@@ -205,7 +216,7 @@ mod logtests {
205216
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
206217
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
207218
pub async fn logs_batch_tokio_log_outside_rt_multi_thread() -> Result<()> {
208-
logs_tokio_helper(false, true).await
219+
logs_tokio_helper(false, true, false).await
209220
}
210221

211222
// logger initialization - Inside RT
@@ -215,7 +226,7 @@ mod logtests {
215226
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
216227
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
217228
pub async fn logs_batch_tokio_log_outside_rt_multi_with_one_worker() -> Result<()> {
218-
logs_tokio_helper(false, true).await
229+
logs_tokio_helper(false, true, false).await
219230
}
220231

221232
// logger initialization - Inside RT
@@ -225,7 +236,7 @@ mod logtests {
225236
#[tokio::test(flavor = "current_thread")]
226237
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
227238
pub async fn logs_batch_tokio_log_outside_rt_current_thread() -> Result<()> {
228-
logs_tokio_helper(false, true).await
239+
logs_tokio_helper(false, true, true).await
229240
}
230241

231242
// logger initialization - Inside RT
@@ -310,7 +321,7 @@ mod logtests {
310321
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
311322
#[cfg(feature = "reqwest-blocking-client")]
312323
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
313-
logs_tokio_helper(true, false).await
324+
logs_tokio_helper(true, false, false).await
314325
}
315326

316327
// logger initialization - Inside RT
@@ -324,7 +335,7 @@ mod logtests {
324335
feature = "hyper-client"
325336
))]
326337
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
327-
logs_tokio_helper(true, false).await
338+
logs_tokio_helper(true, false, false).await
328339
}
329340

330341
// logger initialization - Inside RT
@@ -338,7 +349,7 @@ mod logtests {
338349
feature = "hyper-client"
339350
))]
340351
pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> {
341-
logs_tokio_helper(true, false).await
352+
logs_tokio_helper(true, false, false).await
342353
}
343354

344355
// logger initialization - Inside RT
@@ -353,7 +364,7 @@ mod logtests {
353364
feature = "hyper-client"
354365
))]
355366
pub async fn logs_simple_tokio_current() -> Result<()> {
356-
logs_tokio_helper(true, false).await
367+
logs_tokio_helper(true, false, false).await
357368
}
358369
}
359370
///

opentelemetry-sdk/benches/metric.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use opentelemetry::{
77
Key, KeyValue,
88
};
99
use opentelemetry_sdk::{
10-
error::ShutdownResult,
10+
error::OTelSdkResult,
1111
metrics::{
1212
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
1313
InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream,
@@ -32,7 +32,7 @@ impl MetricReader for SharedReader {
3232
self.0.force_flush()
3333
}
3434

35-
fn shutdown(&self) -> ShutdownResult {
35+
fn shutdown(&self) -> OTelSdkResult {
3636
self.0.shutdown()
3737
}
3838

opentelemetry-sdk/src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ pub trait ExportError: std::error::Error + Send + Sync + 'static {
1111
}
1212

1313
#[derive(Error, Debug)]
14-
/// Errors that can occur during shutdown.
15-
pub enum ShutdownError {
14+
/// Errors that can occur during SDK operations export(), force_flush() and shutdown().
15+
pub enum OTelSdkError {
1616
/// Shutdown has already been invoked.
1717
///
1818
/// While shutdown is idempotent and calling it multiple times has no
@@ -42,4 +42,4 @@ pub enum ShutdownError {
4242
}
4343

4444
/// A specialized `Result` type for Shutdown operations.
45-
pub type ShutdownResult = Result<(), ShutdownError>;
45+
pub type OTelSdkResult = Result<(), OTelSdkError>;

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Interfaces for exporting metrics
22
use async_trait::async_trait;
33

4-
use crate::error::ShutdownResult;
4+
use crate::error::OTelSdkResult;
55
use crate::metrics::MetricResult;
66

77
use crate::metrics::data::ResourceMetrics;
@@ -17,9 +17,8 @@ pub trait PushMetricExporter: Send + Sync + 'static {
1717
///
1818
/// All retry logic must be contained in this function. The SDK does not
1919
/// implement any retry logic. All errors returned by this function are
20-
/// considered unrecoverable and will be reported to a configured error
21-
/// Handler.
22-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
20+
/// considered unrecoverable and will be logged.
21+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
2322

2423
/// Flushes any metric data held by an exporter.
2524
async fn force_flush(&self) -> MetricResult<()>;
@@ -28,7 +27,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
2827
///
2928
/// After Shutdown is called, calls to Export will perform no operation and
3029
/// instead will return an error indicating the shutdown state.
31-
fn shutdown(&self) -> ShutdownResult;
30+
fn shutdown(&self) -> OTelSdkResult;
3231

3332
/// Access the [Temporality] of the MetricExporter.
3433
fn temporality(&self) -> Temporality;

0 commit comments

Comments
 (0)