Skip to content

Commit dc9a5c8

Browse files
authored
Span Exporter async native (#2685)
1 parent ac69af6 commit dc9a5c8

File tree

19 files changed

+166
-157
lines changed

19 files changed

+166
-157
lines changed

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl HttpExporterBuilder {
223223
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
224224
)?;
225225

226-
Ok(crate::SpanExporter::new(client))
226+
Ok(crate::SpanExporter::from_http(client))
227227
}
228228

229229
/// Create a log exporter with the current configuration

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::sync::Arc;
22

33
use super::OtlpHttpClient;
4-
use futures_core::future::BoxFuture;
54
use http::{header::CONTENT_TYPE, Method};
65
use opentelemetry::otel_debug;
76
use opentelemetry_sdk::{
@@ -10,7 +9,7 @@ use opentelemetry_sdk::{
109
};
1110

1211
impl SpanExporter for OtlpHttpClient {
13-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
12+
async fn export(&mut self, batch: Vec<SpanData>) -> OTelSdkResult {
1413
let client = match self
1514
.client
1615
.lock()
@@ -20,16 +19,12 @@ impl SpanExporter for OtlpHttpClient {
2019
_ => Err(OTelSdkError::AlreadyShutdown),
2120
}) {
2221
Ok(client) => client,
23-
Err(err) => return Box::pin(std::future::ready(Err(err))),
22+
Err(err) => return Err(err),
2423
};
2524

2625
let (body, content_type) = match self.build_trace_export_body(batch) {
2726
Ok(body) => body,
28-
Err(e) => {
29-
return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
30-
e.to_string(),
31-
))))
32-
}
27+
Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())),
3328
};
3429

3530
let mut request = match http::Request::builder()
@@ -39,37 +34,31 @@ impl SpanExporter for OtlpHttpClient {
3934
.body(body.into())
4035
{
4136
Ok(req) => req,
42-
Err(e) => {
43-
return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
44-
e.to_string(),
45-
))))
46-
}
37+
Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())),
4738
};
4839

4940
for (k, v) in &self.headers {
5041
request.headers_mut().insert(k.clone(), v.clone());
5142
}
5243

53-
Box::pin(async move {
54-
let request_uri = request.uri().to_string();
55-
otel_debug!(name: "HttpTracesClient.CallingExport");
56-
let response = client
57-
.send_bytes(request)
58-
.await
59-
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
44+
let request_uri = request.uri().to_string();
45+
otel_debug!(name: "HttpTracesClient.CallingExport");
46+
let response = client
47+
.send_bytes(request)
48+
.await
49+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
6050

61-
if !response.status().is_success() {
62-
let error = format!(
63-
"OpenTelemetry trace export failed. Url: {}, Status Code: {}, Response: {:?}",
64-
response.status().as_u16(),
65-
request_uri,
66-
response.body()
67-
);
68-
return Err(OTelSdkError::InternalFailure(error));
69-
}
51+
if !response.status().is_success() {
52+
let error = format!(
53+
"OpenTelemetry trace export failed. Url: {}, Status Code: {}, Response: {:?}",
54+
response.status().as_u16(),
55+
request_uri,
56+
response.body()
57+
);
58+
return Err(OTelSdkError::InternalFailure(error));
59+
}
7060

71-
Ok(())
72-
})
61+
Ok(())
7362
}
7463

7564
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ impl TonicExporterBuilder {
317317

318318
let client = TonicTracesClient::new(channel, interceptor, compression);
319319

320-
Ok(crate::SpanExporter::new(client))
320+
Ok(crate::SpanExporter::from_tonic(client))
321321
}
322322
}
323323

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use core::fmt;
22

3-
use futures_core::future::BoxFuture;
43
use opentelemetry::otel_debug;
54
use opentelemetry_proto::tonic::collector::trace::v1::{
65
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
@@ -59,39 +58,33 @@ impl TonicTracesClient {
5958
}
6059

6160
impl SpanExporter for TonicTracesClient {
62-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
61+
async fn export(&mut self, batch: Vec<SpanData>) -> OTelSdkResult {
6362
let (mut client, metadata, extensions) = match &mut self.inner {
6463
Some(inner) => {
6564
let (m, e, _) = match inner.interceptor.call(Request::new(())) {
6665
Ok(res) => res.into_parts(),
67-
Err(e) => {
68-
return Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
69-
e.to_string(),
70-
))))
71-
}
66+
Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())),
7267
};
7368
(inner.client.clone(), m, e)
7469
}
7570
None => {
76-
return Box::pin(std::future::ready(Err(OTelSdkError::AlreadyShutdown)));
71+
return Err(OTelSdkError::AlreadyShutdown);
7772
}
7873
};
7974

8075
let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
8176

8277
otel_debug!(name: "TonicsTracesClient.CallingExport");
8378

84-
Box::pin(async move {
85-
client
86-
.export(Request::from_parts(
87-
metadata,
88-
extensions,
89-
ExportTraceServiceRequest { resource_spans },
90-
))
91-
.await
92-
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
93-
Ok(())
94-
})
79+
client
80+
.export(Request::from_parts(
81+
metadata,
82+
extensions,
83+
ExportTraceServiceRequest { resource_spans },
84+
))
85+
.await
86+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
87+
Ok(())
9588
}
9689

9790
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/span.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
55
use std::fmt::Debug;
66

7-
use futures_core::future::BoxFuture;
87
use opentelemetry_sdk::error::OTelSdkResult;
98
use opentelemetry_sdk::trace::SpanData;
109

@@ -66,15 +65,15 @@ impl SpanExporterBuilder<TonicExporterBuilderSet> {
6665
pub fn build(self) -> Result<SpanExporter, opentelemetry::trace::TraceError> {
6766
let span_exporter = self.client.0.build_span_exporter()?;
6867
opentelemetry::otel_debug!(name: "SpanExporterBuilt");
69-
Ok(SpanExporter::new(span_exporter))
68+
Ok(span_exporter)
7069
}
7170
}
7271

7372
#[cfg(any(feature = "http-proto", feature = "http-json"))]
7473
impl SpanExporterBuilder<HttpExporterBuilderSet> {
7574
pub fn build(self) -> Result<SpanExporter, opentelemetry::trace::TraceError> {
7675
let span_exporter = self.client.0.build_span_exporter()?;
77-
Ok(SpanExporter::new(span_exporter))
76+
Ok(span_exporter)
7877
}
7978
}
8079

@@ -106,28 +105,57 @@ impl HasHttpConfig for SpanExporterBuilder<HttpExporterBuilderSet> {
106105
}
107106
}
108107

109-
/// OTLP exporter that sends tracing information
108+
/// OTLP exporter that sends tracing data
110109
#[derive(Debug)]
111-
pub struct SpanExporter(Box<dyn opentelemetry_sdk::trace::SpanExporter>);
110+
pub struct SpanExporter {
111+
client: SupportedTransportClient,
112+
}
113+
114+
#[derive(Debug)]
115+
enum SupportedTransportClient {
116+
#[cfg(feature = "grpc-tonic")]
117+
Tonic(crate::exporter::tonic::trace::TonicTracesClient),
118+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
119+
Http(crate::exporter::http::OtlpHttpClient),
120+
}
112121

113122
impl SpanExporter {
114123
/// Obtain a builder to configure a [SpanExporter].
115124
pub fn builder() -> SpanExporterBuilder<NoExporterBuilderSet> {
116125
SpanExporterBuilder::default()
117126
}
118127

119-
/// Build a new span exporter from a client
120-
pub fn new(client: impl opentelemetry_sdk::trace::SpanExporter + 'static) -> Self {
121-
SpanExporter(Box::new(client))
128+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
129+
pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self {
130+
SpanExporter {
131+
client: SupportedTransportClient::Http(client),
132+
}
133+
}
134+
135+
#[cfg(feature = "grpc-tonic")]
136+
pub(crate) fn from_tonic(client: crate::exporter::tonic::trace::TonicTracesClient) -> Self {
137+
SpanExporter {
138+
client: SupportedTransportClient::Tonic(client),
139+
}
122140
}
123141
}
124142

125143
impl opentelemetry_sdk::trace::SpanExporter for SpanExporter {
126-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
127-
self.0.export(batch)
144+
async fn export(&mut self, batch: Vec<SpanData>) -> OTelSdkResult {
145+
match &mut self.client {
146+
#[cfg(feature = "grpc-tonic")]
147+
SupportedTransportClient::Tonic(client) => client.export(batch).await,
148+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
149+
SupportedTransportClient::Http(client) => client.export(batch).await,
150+
}
128151
}
129152

130153
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
131-
self.0.set_resource(resource);
154+
match &mut self.client {
155+
#[cfg(feature = "grpc-tonic")]
156+
SupportedTransportClient::Tonic(client) => client.set_resource(resource),
157+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
158+
SupportedTransportClient::Http(client) => client.set_resource(resource),
159+
}
132160
}
133161
}

opentelemetry-sdk/benches/context.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
2-
use futures_util::future::BoxFuture;
32
use opentelemetry::{
43
global::BoxedTracer,
54
trace::{
@@ -137,8 +136,8 @@ fn parent_sampled_tracer(inner_sampler: Sampler) -> (SdkTracerProvider, BoxedTra
137136
struct NoopExporter;
138137

139138
impl SpanExporter for NoopExporter {
140-
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
141-
Box::pin(futures_util::future::ready(Ok(())))
139+
async fn export(&mut self, _spans: Vec<SpanData>) -> OTelSdkResult {
140+
Ok(())
142141
}
143142
}
144143

opentelemetry-sdk/benches/span_builder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
2-
use futures_util::future::BoxFuture;
32
use opentelemetry::{
43
trace::{Span, Tracer, TracerProvider},
54
KeyValue,
@@ -66,8 +65,8 @@ fn not_sampled_provider() -> (sdktrace::SdkTracerProvider, sdktrace::SdkTracer)
6665
struct NoopExporter;
6766

6867
impl SpanExporter for NoopExporter {
69-
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
70-
Box::pin(futures_util::future::ready(Ok(())))
68+
async fn export(&mut self, _spans: Vec<SpanData>) -> OTelSdkResult {
69+
Ok(())
7170
}
7271
}
7372

opentelemetry-sdk/benches/trace.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
2-
use futures_util::future::BoxFuture;
32
use opentelemetry::{
43
trace::{Span, Tracer, TracerProvider},
54
KeyValue,
@@ -60,8 +59,8 @@ fn criterion_benchmark(c: &mut Criterion) {
6059
struct VoidExporter;
6160

6261
impl SpanExporter for VoidExporter {
63-
fn export(&mut self, _spans: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
64-
Box::pin(futures_util::future::ready(Ok(())))
62+
async fn export(&mut self, _spans: Vec<SpanData>) -> OTelSdkResult {
63+
Ok(())
6564
}
6665
}
6766

opentelemetry-sdk/src/testing/trace/span_exporters.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::{
33
trace::{SpanData, SpanExporter},
44
trace::{SpanEvents, SpanLinks},
55
};
6-
use futures_util::future::BoxFuture;
76
pub use opentelemetry::testing::trace::TestSpan;
87
use opentelemetry::{
98
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
@@ -41,14 +40,12 @@ pub struct TokioSpanExporter {
4140
}
4241

4342
impl SpanExporter for TokioSpanExporter {
44-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
45-
let result = batch.into_iter().try_for_each(|span_data| {
43+
async fn export(&mut self, batch: Vec<SpanData>) -> OTelSdkResult {
44+
batch.into_iter().try_for_each(|span_data| {
4645
self.tx_export
4746
.send(span_data)
4847
.map_err(|err| OTelSdkError::InternalFailure(format!("Export failed: {:?}", err)))
49-
});
50-
51-
Box::pin(std::future::ready(result))
48+
})
5249
}
5350

5451
fn shutdown(&mut self) -> OTelSdkResult {
@@ -112,7 +109,7 @@ impl NoopSpanExporter {
112109
}
113110

114111
impl SpanExporter for NoopSpanExporter {
115-
fn export(&mut self, _: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
116-
Box::pin(std::future::ready(Ok(())))
112+
async fn export(&mut self, _: Vec<SpanData>) -> OTelSdkResult {
113+
Ok(())
117114
}
118115
}

opentelemetry-sdk/src/trace/export.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Trace exporters
22
use crate::error::OTelSdkResult;
33
use crate::Resource;
4-
use futures_util::future::BoxFuture;
54
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
65
use opentelemetry::{InstrumentationScope, KeyValue};
76
use std::borrow::Cow;
@@ -28,7 +27,10 @@ pub trait SpanExporter: Send + Sync + Debug {
2827
///
2928
/// Any retry logic that is required by the exporter is the responsibility
3029
/// of the exporter.
31-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;
30+
fn export(
31+
&mut self,
32+
batch: Vec<SpanData>,
33+
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
3234

3335
/// Shuts down the exporter. Called when SDK is shut down. This is an
3436
/// opportunity for exporter to do any cleanup required.

0 commit comments

Comments
 (0)