Skip to content

Commit b0ad930

Browse files
authored
Merge branch 'main' into fix-json-deser-bytes
2 parents 7d6ea07 + 5bebbcc commit b0ad930

File tree

26 files changed

+232
-215
lines changed

26 files changed

+232
-215
lines changed

examples/metrics-basic/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use opentelemetry::{global, KeyValue};
2-
use opentelemetry_sdk::error::ShutdownError;
2+
use opentelemetry_sdk::error::OTelSdkError;
33
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
44
use opentelemetry_sdk::Resource;
55
use std::error::Error;

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
11
use std::sync::Arc;
22

3+
use crate::metric::MetricsClient;
34
use async_trait::async_trait;
45
use http::{header::CONTENT_TYPE, Method};
56
use opentelemetry::otel_debug;
6-
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
7+
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
78
use opentelemetry_sdk::metrics::data::ResourceMetrics;
8-
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
9-
10-
use crate::{metric::MetricsClient, Error};
119

1210
use super::OtlpHttpClient;
1311

1412
#[async_trait]
1513
impl MetricsClient for OtlpHttpClient {
16-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
14+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
1715
let client = self
1816
.client
1917
.lock()
20-
.map_err(Into::into)
18+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
2119
.and_then(|g| match &*g {
2220
Some(client) => Ok(Arc::clone(client)),
23-
_ => Err(MetricError::Other("exporter is already shut down".into())),
21+
_ => Err(OTelSdkError::AlreadyShutdown),
2422
})?;
2523

26-
let (body, content_type) = self.build_metrics_export_body(metrics)?;
24+
let (body, content_type) = self.build_metrics_export_body(metrics).map_err(|e| {
25+
OTelSdkError::InternalFailure(format!("Failed to serialize metrics: {e:?}"))
26+
})?;
2727
let mut request = http::Request::builder()
2828
.method(Method::POST)
2929
.uri(&self.collector_endpoint)
3030
.header(CONTENT_TYPE, content_type)
3131
.body(body.into())
32-
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
32+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
3333

3434
for (k, v) in &self.headers {
3535
request.headers_mut().insert(k.clone(), v.clone());
@@ -39,15 +39,15 @@ impl MetricsClient for OtlpHttpClient {
3939
client
4040
.send_bytes(request)
4141
.await
42-
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;
42+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
4343

4444
Ok(())
4545
}
4646

47-
fn shutdown(&self) -> ShutdownResult {
47+
fn shutdown(&self) -> OTelSdkResult {
4848
self.client
4949
.lock()
50-
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
50+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
5151
.take();
5252

5353
Ok(())

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use opentelemetry::otel_debug;
66
use opentelemetry_proto::tonic::collector::metrics::v1::{
77
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
88
};
9-
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
9+
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
1010
use opentelemetry_sdk::metrics::data::ResourceMetrics;
11-
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
1211
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1312

1413
use super::BoxInterceptor;
@@ -55,26 +54,28 @@ impl TonicMetricsClient {
5554

5655
#[async_trait]
5756
impl MetricsClient for TonicMetricsClient {
58-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
59-
let (mut client, metadata, extensions) =
60-
self.inner
61-
.lock()
62-
.map_err(Into::into)
63-
.and_then(|mut inner| match &mut *inner {
64-
Some(inner) => {
65-
let (m, e, _) = inner
66-
.interceptor
67-
.call(Request::new(()))
68-
.map_err(|e| {
69-
MetricError::Other(format!(
70-
"unexpected status while exporting {e:?}"
71-
))
72-
})?
73-
.into_parts();
74-
Ok((inner.client.clone(), m, e))
75-
}
76-
None => Err(MetricError::Other("exporter is already shut down".into())),
77-
})?;
57+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
58+
let (mut client, metadata, extensions) = self
59+
.inner
60+
.lock()
61+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
62+
.and_then(|mut inner| match &mut *inner {
63+
Some(inner) => {
64+
let (m, e, _) = inner
65+
.interceptor
66+
.call(Request::new(()))
67+
.map_err(|e| {
68+
OTelSdkError::InternalFailure(format!(
69+
"unexpected status while exporting {e:?}"
70+
))
71+
})?
72+
.into_parts();
73+
Ok((inner.client.clone(), m, e))
74+
}
75+
None => Err(OTelSdkError::InternalFailure(
76+
"exporter is already shut down".into(),
77+
)),
78+
})?;
7879

7980
otel_debug!(name: "TonicsMetricsClient.CallingExport");
8081

@@ -85,15 +86,15 @@ impl MetricsClient for TonicMetricsClient {
8586
ExportMetricsServiceRequest::from(&*metrics),
8687
))
8788
.await
88-
.map_err(crate::Error::from)?;
89+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
8990

9091
Ok(())
9192
}
9293

93-
fn shutdown(&self) -> ShutdownResult {
94+
fn shutdown(&self) -> OTelSdkResult {
9495
self.inner
9596
.lock()
96-
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
97+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
9798
.take();
9899

99100
Ok(())

opentelemetry-otlp/src/metric.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::NoExporterBuilderSet;
1616

1717
use async_trait::async_trait;
1818
use core::fmt;
19-
use opentelemetry_sdk::error::ShutdownResult;
19+
use opentelemetry_sdk::error::OTelSdkResult;
2020
use opentelemetry_sdk::metrics::MetricResult;
2121

2222
use opentelemetry_sdk::metrics::{
@@ -123,8 +123,8 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
123123
/// An interface for OTLP metrics clients
124124
#[async_trait]
125125
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
126-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
127-
fn shutdown(&self) -> ShutdownResult;
126+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult;
127+
fn shutdown(&self) -> OTelSdkResult;
128128
}
129129

130130
/// Export metrics in OTEL format.
@@ -141,16 +141,16 @@ impl Debug for MetricExporter {
141141

142142
#[async_trait]
143143
impl PushMetricExporter for MetricExporter {
144-
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
144+
async fn export(&self, metrics: &mut ResourceMetrics) -> OTelSdkResult {
145145
self.client.export(metrics).await
146146
}
147147

148-
async fn force_flush(&self) -> MetricResult<()> {
148+
async fn force_flush(&self) -> OTelSdkResult {
149149
// this component is stateless
150150
Ok(())
151151
}
152152

153-
fn shutdown(&self) -> ShutdownResult {
153+
fn shutdown(&self) -> OTelSdkResult {
154154
self.client.shutdown()
155155
}
156156

opentelemetry-otlp/tests/integration_test/tests/logs.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ fn init_logs(is_simple: bool) -> Result<sdklogs::LoggerProvider> {
4646
Ok(logger_provider)
4747
}
4848

49-
async fn logs_tokio_helper(is_simple: bool, log_send_outside_rt: bool) -> Result<()> {
49+
async fn logs_tokio_helper(
50+
is_simple: bool,
51+
log_send_outside_rt: bool,
52+
current_thread: bool,
53+
) -> Result<()> {
5054
use crate::{assert_logs_results_contains, init_logs};
5155
test_utils::start_collector_container().await?;
5256

@@ -76,7 +80,14 @@ async fn logs_tokio_helper(is_simple: bool, log_send_outside_rt: bool) -> Result
7680
info!(target: "my-target", uuid = expected_uuid.as_str(), "hello from {}. My price is {}.", "banana", 2.99);
7781
}
7882
}
79-
let _ = logger_provider.shutdown();
83+
if current_thread {
84+
let _res = tokio::runtime::Handle::current()
85+
.spawn_blocking(move || logger_provider.shutdown())
86+
.await
87+
.unwrap();
88+
} else {
89+
let _ = logger_provider.shutdown();
90+
}
8091
tokio::time::sleep(Duration::from_secs(5)).await;
8192
assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?;
8293
Ok(())
@@ -175,7 +186,7 @@ mod logtests {
175186
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
176187
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
177188
pub async fn logs_batch_tokio_multi_thread() -> Result<()> {
178-
logs_tokio_helper(false, false).await
189+
logs_tokio_helper(false, false, false).await
179190
}
180191

181192
// logger initialization - Inside RT
@@ -185,7 +196,7 @@ mod logtests {
185196
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
186197
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
187198
pub async fn logs_batch_tokio_multi_with_one_worker() -> Result<()> {
188-
logs_tokio_helper(false, false).await
199+
logs_tokio_helper(false, false, false).await
189200
}
190201

191202
// logger initialization - Inside RT
@@ -195,7 +206,7 @@ mod logtests {
195206
#[tokio::test(flavor = "current_thread")]
196207
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
197208
pub async fn logs_batch_tokio_current() -> Result<()> {
198-
logs_tokio_helper(false, false).await
209+
logs_tokio_helper(false, false, true).await
199210
}
200211

201212
// logger initialization - Inside RT
@@ -205,7 +216,7 @@ mod logtests {
205216
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
206217
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
207218
pub async fn logs_batch_tokio_log_outside_rt_multi_thread() -> Result<()> {
208-
logs_tokio_helper(false, true).await
219+
logs_tokio_helper(false, true, false).await
209220
}
210221

211222
// logger initialization - Inside RT
@@ -215,7 +226,7 @@ mod logtests {
215226
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
216227
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
217228
pub async fn logs_batch_tokio_log_outside_rt_multi_with_one_worker() -> Result<()> {
218-
logs_tokio_helper(false, true).await
229+
logs_tokio_helper(false, true, false).await
219230
}
220231

221232
// logger initialization - Inside RT
@@ -225,7 +236,7 @@ mod logtests {
225236
#[tokio::test(flavor = "current_thread")]
226237
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
227238
pub async fn logs_batch_tokio_log_outside_rt_current_thread() -> Result<()> {
228-
logs_tokio_helper(false, true).await
239+
logs_tokio_helper(false, true, true).await
229240
}
230241

231242
// logger initialization - Inside RT
@@ -310,7 +321,7 @@ mod logtests {
310321
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
311322
#[cfg(feature = "reqwest-blocking-client")]
312323
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
313-
logs_tokio_helper(true, false).await
324+
logs_tokio_helper(true, false, false).await
314325
}
315326

316327
// logger initialization - Inside RT
@@ -324,7 +335,7 @@ mod logtests {
324335
feature = "hyper-client"
325336
))]
326337
pub async fn logs_simple_tokio_multi_thread() -> Result<()> {
327-
logs_tokio_helper(true, false).await
338+
logs_tokio_helper(true, false, false).await
328339
}
329340

330341
// logger initialization - Inside RT
@@ -338,7 +349,7 @@ mod logtests {
338349
feature = "hyper-client"
339350
))]
340351
pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> {
341-
logs_tokio_helper(true, false).await
352+
logs_tokio_helper(true, false, false).await
342353
}
343354

344355
// logger initialization - Inside RT
@@ -353,7 +364,7 @@ mod logtests {
353364
feature = "hyper-client"
354365
))]
355366
pub async fn logs_simple_tokio_current() -> Result<()> {
356-
logs_tokio_helper(true, false).await
367+
logs_tokio_helper(true, false, false).await
357368
}
358369
}
359370
///

opentelemetry-otlp/tests/integration_test/tests/metrics.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ mod metrictests {
6969
);
7070

7171
// In tokio::current_thread flavor, shutdown must be done in a separate thread
72-
let _res = Handle::current()
72+
let shutdown_resut = Handle::current()
7373
.spawn_blocking(move || meter_provider.shutdown())
7474
.await
7575
.unwrap();
76+
assert!(shutdown_resut.is_ok());
7677
// We still need to sleep, to give otel-collector a chance to flush to disk
7778
std::thread::sleep(SLEEP_DURATION);
7879

@@ -101,7 +102,8 @@ mod metrictests {
101102
],
102103
);
103104

104-
meter_provider.shutdown()?;
105+
let shutdown_resut = meter_provider.shutdown();
106+
assert!(shutdown_resut.is_ok());
105107
// We still need to sleep, to give otel-collector a chance to flush to disk
106108
std::thread::sleep(SLEEP_DURATION);
107109

@@ -127,7 +129,8 @@ mod metrictests {
127129
],
128130
);
129131

130-
meter_provider.shutdown()?;
132+
let shutdown_resut = meter_provider.shutdown();
133+
assert!(shutdown_resut.is_ok());
131134
// We still need to sleep, to give otel-collector a chance to flush to disk
132135
std::thread::sleep(SLEEP_DURATION);
133136

opentelemetry-proto/src/transform/logs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,11 @@ mod tests {
223223
use opentelemetry::logs::LogRecord as _;
224224
use opentelemetry::logs::Logger as _;
225225
use opentelemetry::logs::LoggerProvider as _;
226+
use opentelemetry::time::now;
226227
use opentelemetry::InstrumentationScope;
227228
use opentelemetry_sdk::logs::LogProcessor;
228229
use opentelemetry_sdk::logs::{LogResult, LoggerProvider};
229230
use opentelemetry_sdk::{logs::LogBatch, logs::LogRecord, Resource};
230-
use std::time::SystemTime;
231231

232232
#[derive(Debug)]
233233
struct MockProcessor;
@@ -254,8 +254,8 @@ mod tests {
254254
.build()
255255
.logger("test");
256256
let mut logrecord = logger.create_log_record();
257-
logrecord.set_timestamp(SystemTime::now());
258-
logrecord.set_observed_timestamp(SystemTime::now());
257+
logrecord.set_timestamp(now());
258+
logrecord.set_observed_timestamp(now());
259259
let instrumentation =
260260
InstrumentationScope::builder(instrumentation_name.to_string()).build();
261261
(logrecord, instrumentation)

opentelemetry-proto/src/transform/trace.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ pub mod tonic {
193193
mod tests {
194194
use crate::tonic::common::v1::any_value::Value;
195195
use crate::transform::common::tonic::ResourceAttributesWithSchema;
196+
use opentelemetry::time::now;
196197
use opentelemetry::trace::{
197198
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
198199
};
@@ -202,7 +203,7 @@ mod tests {
202203
use opentelemetry_sdk::trace::SpanData;
203204
use opentelemetry_sdk::trace::{SpanEvents, SpanLinks};
204205
use std::borrow::Cow;
205-
use std::time::{Duration, SystemTime};
206+
use std::time::Duration;
206207

207208
fn create_test_span_data(instrumentation_name: &'static str) -> SpanData {
208209
let span_context = SpanContext::new(
@@ -218,8 +219,8 @@ mod tests {
218219
parent_span_id: SpanId::from_u64(0),
219220
span_kind: SpanKind::Internal,
220221
name: Cow::Borrowed("test_span"),
221-
start_time: SystemTime::now(),
222-
end_time: SystemTime::now() + Duration::from_secs(1),
222+
start_time: now(),
223+
end_time: now() + Duration::from_secs(1),
223224
attributes: vec![KeyValue::new("key", "value")],
224225
dropped_attributes_count: 0,
225226
events: SpanEvents::default(),

0 commit comments

Comments
 (0)