Skip to content

Commit d01c113

Browse files
authored
Trace subsystem - use OTelSdkResult/OTelSdkError (#2613)
1 parent 0e751b4 commit d01c113

File tree

20 files changed

+246
-175
lines changed

20 files changed

+246
-175
lines changed

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,35 @@
11
use std::sync::Arc;
22

3+
use super::OtlpHttpClient;
34
use futures_core::future::BoxFuture;
45
use http::{header::CONTENT_TYPE, Method};
5-
use opentelemetry::{otel_debug, trace::TraceError};
6-
use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter};
7-
8-
use super::OtlpHttpClient;
6+
use opentelemetry::otel_debug;
7+
use opentelemetry_sdk::{
8+
error::{OTelSdkError, OTelSdkResult},
9+
trace::{SpanData, SpanExporter},
10+
};
911

1012
impl SpanExporter for OtlpHttpClient {
11-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
13+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
1214
let client = match self
1315
.client
1416
.lock()
15-
.map_err(|e| TraceError::Other(e.to_string().into()))
17+
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))
1618
.and_then(|g| match &*g {
1719
Some(client) => Ok(Arc::clone(client)),
18-
_ => Err(TraceError::Other("exporter is already shut down".into())),
20+
_ => Err(OTelSdkError::AlreadyShutdown),
1921
}) {
2022
Ok(client) => client,
2123
Err(err) => return Box::pin(std::future::ready(Err(err))),
2224
};
2325

2426
let (body, content_type) = match self.build_trace_export_body(batch) {
2527
Ok(body) => body,
26-
Err(e) => return Box::pin(std::future::ready(Err(e))),
28+
Err(e) => {
29+
return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
30+
e.to_string(),
31+
))))
32+
}
2733
};
2834

2935
let mut request = match http::Request::builder()
@@ -34,10 +40,9 @@ impl SpanExporter for OtlpHttpClient {
3440
{
3541
Ok(req) => req,
3642
Err(e) => {
37-
return Box::pin(std::future::ready(Err(crate::Error::RequestFailed(
38-
Box::new(e),
39-
)
40-
.into())))
43+
return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
44+
e.to_string(),
45+
))))
4146
}
4247
};
4348

@@ -48,7 +53,10 @@ impl SpanExporter for OtlpHttpClient {
4853
Box::pin(async move {
4954
let request_uri = request.uri().to_string();
5055
otel_debug!(name: "HttpTracesClient.CallingExport");
51-
let response = client.send_bytes(request).await?;
56+
let response = client
57+
.send_bytes(request)
58+
.await
59+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
5260

5361
if !response.status().is_success() {
5462
let error = format!(
@@ -57,15 +65,23 @@ impl SpanExporter for OtlpHttpClient {
5765
request_uri,
5866
response.body()
5967
);
60-
return Err(TraceError::Other(error.into()));
68+
return Err(OTelSdkError::InternalFailure(error));
6169
}
6270

6371
Ok(())
6472
})
6573
}
6674

67-
fn shutdown(&mut self) {
68-
let _ = self.client.lock().map(|mut c| c.take());
75+
fn shutdown(&mut self) -> OTelSdkResult {
76+
let mut client_guard = self.client.lock().map_err(|e| {
77+
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
78+
})?;
79+
80+
if client_guard.take().is_none() {
81+
return Err(OTelSdkError::AlreadyShutdown);
82+
}
83+
84+
Ok(())
6985
}
7086

7187
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use core::fmt;
22

33
use futures_core::future::BoxFuture;
4-
use opentelemetry::{otel_debug, trace::TraceError};
4+
use opentelemetry::otel_debug;
55
use opentelemetry_proto::tonic::collector::trace::v1::{
66
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
77
};
8-
use opentelemetry_sdk::trace::{ExportResult, SpanData, SpanExporter};
9-
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
10-
118
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
9+
use opentelemetry_sdk::error::OTelSdkError;
10+
use opentelemetry_sdk::{
11+
error::OTelSdkResult,
12+
trace::{SpanData, SpanExporter},
13+
};
14+
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1215

1316
use super::BoxInterceptor;
1417

@@ -56,21 +59,21 @@ impl TonicTracesClient {
5659
}
5760

5861
impl SpanExporter for TonicTracesClient {
59-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
62+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
6063
let (mut client, metadata, extensions) = match &mut self.inner {
6164
Some(inner) => {
6265
let (m, e, _) = match inner.interceptor.call(Request::new(())) {
6366
Ok(res) => res.into_parts(),
6467
Err(e) => {
65-
return Box::pin(std::future::ready(Err(TraceError::Other(Box::new(e)))))
68+
return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
69+
e.to_string(),
70+
))))
6671
}
6772
};
6873
(inner.client.clone(), m, e)
6974
}
7075
None => {
71-
return Box::pin(std::future::ready(Err(TraceError::Other(
72-
"exporter is already shut down".into(),
73-
))))
76+
return Box::pin(std::future::ready(Err(OTelSdkError::AlreadyShutdown)));
7477
}
7578
};
7679

@@ -86,14 +89,16 @@ impl SpanExporter for TonicTracesClient {
8689
ExportTraceServiceRequest { resource_spans },
8790
))
8891
.await
89-
.map_err(crate::Error::from)?;
90-
92+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
9193
Ok(())
9294
})
9395
}
9496

95-
fn shutdown(&mut self) {
96-
let _ = self.inner.take();
97+
fn shutdown(&mut self) -> OTelSdkResult {
98+
match self.inner.take() {
99+
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
100+
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
101+
}
97102
}
98103

99104
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {

opentelemetry-otlp/src/span.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
use std::fmt::Debug;
66

77
use futures_core::future::BoxFuture;
8-
use opentelemetry_sdk::trace::{ExportResult, SpanData};
8+
use opentelemetry_sdk::error::OTelSdkResult;
9+
use opentelemetry_sdk::trace::SpanData;
910

1011
#[cfg(feature = "grpc-tonic")]
1112
use crate::{
@@ -122,7 +123,7 @@ impl SpanExporter {
122123
}
123124

124125
impl opentelemetry_sdk::trace::SpanExporter for SpanExporter {
125-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
126+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
126127
self.0.export(batch)
127128
}
128129

opentelemetry-sdk/CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,24 @@ let processor = BatchSpanProcessor::builder(exporter)
373373
.build();
374374
```
375375

376+
- **Breaking**
377+
- The public API changes in the Tracing:
378+
- Before:
379+
```rust
380+
fn SpanExporter::export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult>;
381+
fn SpanExporter::shutdown(&mut self);
382+
fn SpanExporter::force_flush(&mut self) -> BoxFuture<'static, ExportResult>
383+
fn TraerProvider::shutdown(&self) -> TraceResult<()>
384+
fn TracerProvider::force_flush(&self) -> Vec<TraceResult<()>>
385+
```
386+
- After:
387+
```rust
388+
fn SpanExporter::export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;
389+
fn SpanExporter::shutdown(&mut self) -> OTelSdkResult;
390+
fn SpanExporter::force_flush(&mut self) -> BoxFuture<'static, OTelSdkResult>
391+
fn TraerProvider::shutdown(&self) -> OTelSdkResult;
392+
fn TracerProvider::force_flush(&self) -> OTelSdkResult;
393+
```
376394
- **Breaking** Renamed `LoggerProvider` and `Logger` to `SdkLoggerProvider` and
377395
`SdkLogger` respectively to avoid name collision with public API types.
378396
[#2612](https://github.com/open-telemetry/opentelemetry-rust/pull/2612)

opentelemetry-sdk/benches/context.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use opentelemetry::{
99
Context, ContextGuard,
1010
};
1111
use opentelemetry_sdk::{
12-
trace::{ExportResult, SpanData, SpanExporter},
13-
trace::{Sampler, TracerProvider},
12+
error::OTelSdkResult,
13+
trace::{Sampler, SpanData, SpanExporter, TracerProvider},
1414
};
1515
#[cfg(not(target_os = "windows"))]
1616
use pprof::criterion::{Output, PProfProfiler};
@@ -137,7 +137,7 @@ fn parent_sampled_tracer(inner_sampler: Sampler) -> (TracerProvider, BoxedTracer
137137
struct NoopExporter;
138138

139139
impl SpanExporter for NoopExporter {
140-
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
140+
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
141141
Box::pin(futures_util::future::ready(Ok(())))
142142
}
143143
}

opentelemetry-sdk/benches/span_builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use opentelemetry::{
44
trace::{Span, Tracer, TracerProvider},
55
KeyValue,
66
};
7+
use opentelemetry_sdk::error::OTelSdkResult;
78
use opentelemetry_sdk::{
89
trace as sdktrace,
9-
trace::{ExportResult, SpanData, SpanExporter},
10+
trace::{SpanData, SpanExporter},
1011
};
1112
#[cfg(not(target_os = "windows"))]
1213
use pprof::criterion::{Output, PProfProfiler};
@@ -65,7 +66,7 @@ fn not_sampled_provider() -> (sdktrace::TracerProvider, sdktrace::Tracer) {
6566
struct NoopExporter;
6667

6768
impl SpanExporter for NoopExporter {
68-
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
69+
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
6970
Box::pin(futures_util::future::ready(Ok(())))
7071
}
7172
}

opentelemetry-sdk/benches/trace.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use opentelemetry::{
55
KeyValue,
66
};
77
use opentelemetry_sdk::{
8-
trace as sdktrace,
9-
trace::{ExportResult, SpanData, SpanExporter},
8+
error::OTelSdkResult,
9+
trace::{self as sdktrace, SpanData, SpanExporter},
1010
};
1111
#[cfg(not(target_os = "windows"))]
1212
use pprof::criterion::{Output, PProfProfiler};
@@ -60,7 +60,7 @@ fn criterion_benchmark(c: &mut Criterion) {
6060
struct VoidExporter;
6161

6262
impl SpanExporter for VoidExporter {
63-
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
63+
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
6464
Box::pin(futures_util::future::ready(Ok(())))
6565
}
6666
}

opentelemetry-sdk/src/testing/trace/span_exporters.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::error::{OTelSdkError, OTelSdkResult};
12
use crate::{
2-
trace::{ExportResult, SpanData, SpanExporter},
3+
trace::{SpanData, SpanExporter},
34
trace::{SpanEvents, SpanLinks},
45
};
56
use futures_util::future::BoxFuture;
@@ -40,21 +41,20 @@ pub struct TokioSpanExporter {
4041
}
4142

4243
impl SpanExporter for TokioSpanExporter {
43-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
44-
for span_data in batch {
45-
if let Err(err) = self
46-
.tx_export
44+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
45+
let result = batch.into_iter().try_for_each(|span_data| {
46+
self.tx_export
4747
.send(span_data)
48-
.map_err::<TestExportError, _>(Into::into)
49-
{
50-
return Box::pin(std::future::ready(Err(Into::into(err))));
51-
}
52-
}
53-
Box::pin(std::future::ready(Ok(())))
48+
.map_err(|err| OTelSdkError::InternalFailure(format!("Export failed: {:?}", err)))
49+
});
50+
51+
Box::pin(std::future::ready(result))
5452
}
5553

56-
fn shutdown(&mut self) {
57-
self.tx_shutdown.send(()).unwrap();
54+
fn shutdown(&mut self) -> OTelSdkResult {
55+
self.tx_shutdown.send(()).map_err(|_| {
56+
OTelSdkError::InternalFailure("Failed to send shutdown signal".to_string())
57+
})
5858
}
5959
}
6060

@@ -113,7 +113,7 @@ impl NoopSpanExporter {
113113

114114
#[async_trait::async_trait]
115115
impl SpanExporter for NoopSpanExporter {
116-
fn export(&mut self, _: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
116+
fn export(&mut self, _: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
117117
Box::pin(std::future::ready(Ok(())))
118118
}
119119
}

opentelemetry-sdk/src/trace/export.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//! Trace exporters
2+
use crate::error::OTelSdkResult;
23
use crate::Resource;
34
use futures_util::future::BoxFuture;
45
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError};
@@ -30,7 +31,7 @@ pub trait SpanExporter: Send + Sync + Debug {
3031
///
3132
/// Any retry logic that is required by the exporter is the responsibility
3233
/// of the exporter.
33-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult>;
34+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;
3435

3536
/// Shuts down the exporter. Called when SDK is shut down. This is an
3637
/// opportunity for exporter to do any cleanup required.
@@ -43,7 +44,9 @@ pub trait SpanExporter: Send + Sync + Debug {
4344
/// flush the data and the destination is unavailable). SDK authors
4445
/// can decide if they want to make the shutdown timeout
4546
/// configurable.
46-
fn shutdown(&mut self) {}
47+
fn shutdown(&mut self) -> OTelSdkResult {
48+
Ok(())
49+
}
4750

4851
/// This is a hint to ensure that the export of any Spans the exporter
4952
/// has received prior to the call to this function SHOULD be completed
@@ -60,8 +63,8 @@ pub trait SpanExporter: Send + Sync + Debug {
6063
/// implemented as a blocking API or an asynchronous API which notifies the caller via
6164
/// a callback or an event. OpenTelemetry client authors can decide if they want to
6265
/// make the flush timeout configurable.
63-
fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> {
64-
Box::pin(async { Ok(()) })
66+
fn force_flush(&mut self) -> OTelSdkResult {
67+
Ok(())
6568
}
6669

6770
/// Set the resource for the exporter.

opentelemetry-sdk/src/trace/in_memory_exporter.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use crate::error::{OTelSdkError, OTelSdkResult};
12
use crate::resource::Resource;
2-
use crate::trace::{ExportResult, SpanData, SpanExporter};
3+
use crate::trace::{SpanData, SpanExporter};
34
use futures_util::future::BoxFuture;
45
use opentelemetry::trace::{TraceError, TraceResult};
56
use std::sync::{Arc, Mutex};
@@ -130,20 +131,20 @@ impl InMemorySpanExporter {
130131
}
131132

132133
impl SpanExporter for InMemorySpanExporter {
133-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
134-
if let Err(err) = self
134+
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
135+
let result = self
135136
.spans
136137
.lock()
137138
.map(|mut spans_guard| spans_guard.append(&mut batch.clone()))
138-
.map_err(TraceError::from)
139-
{
140-
return Box::pin(std::future::ready(Err(Into::into(err))));
141-
}
142-
Box::pin(std::future::ready(Ok(())))
139+
.map_err(|err| {
140+
OTelSdkError::InternalFailure(format!("Failed to lock spans: {:?}", err))
141+
});
142+
Box::pin(std::future::ready(result))
143143
}
144144

145-
fn shutdown(&mut self) {
146-
self.reset()
145+
fn shutdown(&mut self) -> OTelSdkResult {
146+
self.reset();
147+
Ok(())
147148
}
148149

149150
fn set_resource(&mut self, resource: &Resource) {

0 commit comments

Comments
 (0)