Skip to content

Commit 8cacf52

Browse files
committed
fix otlp
1 parent e6472bf commit 8cacf52

File tree

12 files changed

+155
-118
lines changed

12 files changed

+155
-118
lines changed

opentelemetry-appender-tracing/benches/logs.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@ struct NoopExporter {
3434

3535
#[async_trait]
3636
impl LogExporter for NoopExporter {
37-
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
38-
LogResult::Ok(())
37+
#[allow(clippy::manual_async_fn)]
38+
fn export<'a>(
39+
&'a self,
40+
_batch: &'a LogBatch<'a>,
41+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
42+
async { LogResult::Ok(()) }
3943
}
4044

4145
fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool {
@@ -44,17 +48,17 @@ impl LogExporter for NoopExporter {
4448
}
4549

4650
#[derive(Debug)]
47-
struct NoopProcessor {
48-
exporter: Box<dyn LogExporter>,
51+
struct NoopProcessor<E: LogExporter> {
52+
exporter: E,
4953
}
5054

51-
impl NoopProcessor {
52-
fn new(exporter: Box<dyn LogExporter>) -> Self {
55+
impl<E: LogExporter> NoopProcessor<E> {
56+
fn new(exporter: E) -> Self {
5357
Self { exporter }
5458
}
5559
}
5660

57-
impl LogProcessor for NoopProcessor {
61+
impl<E: LogExporter> LogProcessor for NoopProcessor<E> {
5862
fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) {
5963
// no-op
6064
}
@@ -124,7 +128,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) {
124128

125129
fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) {
126130
let exporter = NoopExporter { enabled };
127-
let processor = NoopProcessor::new(Box::new(exporter));
131+
let processor = NoopProcessor::new(exporter);
128132
let provider = LoggerProvider::builder()
129133
.with_resource(Resource::new(vec![KeyValue::new(
130134
"service.name",

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ mod tests {
247247

248248
#[async_trait]
249249
impl LogExporter for ReentrantLogExporter {
250+
#[allow(clippy::manual_async_fn)]
250251
fn export<'a>(
251252
&'a self,
252253
_batch: &'a LogBatch<'a>,

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,42 +9,48 @@ use super::OtlpHttpClient;
99

1010
#[async_trait]
1111
impl LogExporter for OtlpHttpClient {
12-
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
13-
let client = self
14-
.client
15-
.lock()
16-
.map_err(|e| LogError::Other(e.to_string().into()))
17-
.and_then(|g| match &*g {
18-
Some(client) => Ok(Arc::clone(client)),
19-
_ => Err(LogError::Other("exporter is already shut down".into())),
20-
})?;
21-
22-
let (body, content_type) = { self.build_logs_export_body(batch)? };
23-
let mut request = http::Request::builder()
24-
.method(Method::POST)
25-
.uri(&self.collector_endpoint)
26-
.header(CONTENT_TYPE, content_type)
27-
.body(body)
28-
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
29-
30-
for (k, v) in &self.headers {
31-
request.headers_mut().insert(k.clone(), v.clone());
12+
#[allow(clippy::manual_async_fn)]
13+
fn export<'a>(
14+
&'a self,
15+
batch: &'a LogBatch<'a>,
16+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
17+
async move {
18+
let client = self
19+
.client
20+
.lock()
21+
.map_err(|e| LogError::Other(e.to_string().into()))
22+
.and_then(|g| match &*g {
23+
Some(client) => Ok(Arc::clone(client)),
24+
_ => Err(LogError::Other("exporter is already shut down".into())),
25+
})?;
26+
27+
let (body, content_type) = { self.build_logs_export_body(batch)? };
28+
let mut request = http::Request::builder()
29+
.method(Method::POST)
30+
.uri(&self.collector_endpoint)
31+
.header(CONTENT_TYPE, content_type)
32+
.body(body)
33+
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
34+
35+
for (k, v) in &self.headers {
36+
request.headers_mut().insert(k.clone(), v.clone());
37+
}
38+
39+
let request_uri = request.uri().to_string();
40+
let response = client.send(request).await?;
41+
42+
if !response.status().is_success() {
43+
let error = format!(
44+
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
45+
response.status().as_u16(),
46+
request_uri,
47+
response.body()
48+
);
49+
return Err(LogError::Other(error.into()));
50+
}
51+
52+
Ok(())
3253
}
33-
34-
let request_uri = request.uri().to_string();
35-
let response = client.send(request).await?;
36-
37-
if !response.status().is_success() {
38-
let error = format!(
39-
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
40-
response.status().as_u16(),
41-
request_uri,
42-
response.body()
43-
);
44-
return Err(LogError::Other(error.into()));
45-
}
46-
47-
Ok(())
4854
}
4955

5056
fn shutdown(&mut self) {

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::time::Duration;
2828
mod metrics;
2929

3030
#[cfg(feature = "logs")]
31-
mod logs;
31+
pub(crate) mod logs;
3232

3333
#[cfg(feature = "trace")]
3434
mod trace;
@@ -236,7 +236,7 @@ impl HttpExporterBuilder {
236236
OTEL_EXPORTER_OTLP_LOGS_HEADERS,
237237
)?;
238238

239-
Ok(crate::LogExporter::new(client))
239+
Ok(crate::LogExporter::from_http(client))
240240
}
241241

242242
/// Create a metrics exporter with the current configuration
@@ -262,7 +262,7 @@ impl HttpExporterBuilder {
262262
}
263263

264264
#[derive(Debug)]
265-
struct OtlpHttpClient {
265+
pub(crate) struct OtlpHttpClient {
266266
client: Mutex<Option<Arc<dyn HttpClient>>>,
267267
collector_endpoint: Uri,
268268
headers: HashMap<HeaderName, HeaderValue>,
@@ -314,7 +314,7 @@ impl OtlpHttpClient {
314314
#[cfg(feature = "logs")]
315315
fn build_logs_export_body(
316316
&self,
317-
logs: LogBatch<'_>,
317+
logs: &LogBatch<'_>,
318318
) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
319319
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
320320
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,33 +55,39 @@ impl TonicLogsClient {
5555

5656
#[async_trait]
5757
impl LogExporter for TonicLogsClient {
58-
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
59-
let (mut client, metadata, extensions) = match &self.inner {
60-
Some(inner) => {
61-
let (m, e, _) = inner
62-
.interceptor
63-
.lock()
64-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
65-
.call(Request::new(()))
66-
.map_err(|e| LogError::Other(Box::new(e)))?
67-
.into_parts();
68-
(inner.client.clone(), m, e)
69-
}
70-
None => return Err(LogError::Other("exporter is already shut down".into())),
71-
};
58+
#[allow(clippy::manual_async_fn)]
59+
fn export<'a>(
60+
&'a self,
61+
batch: &'a LogBatch<'a>,
62+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
63+
async move {
64+
let (mut client, metadata, extensions) = match &self.inner {
65+
Some(inner) => {
66+
let (m, e, _) = inner
67+
.interceptor
68+
.lock()
69+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
70+
.call(Request::new(()))
71+
.map_err(|e| LogError::Other(Box::new(e)))?
72+
.into_parts();
73+
(inner.client.clone(), m, e)
74+
}
75+
None => return Err(LogError::Other("exporter is already shut down".into())),
76+
};
7277

73-
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
78+
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
7479

75-
client
76-
.export(Request::from_parts(
77-
metadata,
78-
extensions,
79-
ExportLogsServiceRequest { resource_logs },
80-
))
81-
.await
82-
.map_err(crate::Error::from)?;
80+
client
81+
.export(Request::from_parts(
82+
metadata,
83+
extensions,
84+
ExportLogsServiceRequest { resource_logs },
85+
))
86+
.await
87+
.map_err(crate::Error::from)?;
8388

84-
Ok(())
89+
Ok(())
90+
}
8591
}
8692

8793
fn shutdown(&mut self) {

opentelemetry-otlp/src/exporter/tonic/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
};
2020

2121
#[cfg(feature = "logs")]
22-
mod logs;
22+
pub(crate) mod logs;
2323

2424
#[cfg(feature = "metrics")]
2525
mod metrics;
@@ -266,7 +266,7 @@ impl TonicExporterBuilder {
266266

267267
let client = TonicLogsClient::new(channel, interceptor, compression);
268268

269-
Ok(crate::logs::LogExporter::new(client))
269+
Ok(crate::logs::LogExporter::from_tonic(client))
270270
}
271271

272272
/// Build a new tonic metrics exporter

opentelemetry-otlp/src/logs.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,16 @@ impl HasHttpConfig for LogExporterBuilder<HttpExporterBuilderSet> {
105105
/// OTLP exporter that sends log data
106106
#[derive(Debug)]
107107
pub struct LogExporter {
108-
client: Box<dyn opentelemetry_sdk::export::logs::LogExporter>,
108+
//client: Box<dyn opentelemetry_sdk::export::logs::LogExporter>,
109+
client: LogExporterInner,
110+
}
111+
112+
#[derive(Debug)]
113+
enum LogExporterInner {
114+
#[cfg(feature = "grpc-tonic")]
115+
Tonic(crate::exporter::tonic::logs::TonicLogsClient),
116+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
117+
Http(crate::exporter::http::OtlpHttpClient),
109118
}
110119

111120
impl LogExporter {
@@ -114,21 +123,44 @@ impl LogExporter {
114123
LogExporterBuilder::default()
115124
}
116125

117-
/// Create a new log exporter
118-
pub fn new(client: impl opentelemetry_sdk::export::logs::LogExporter + 'static) -> Self {
126+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
127+
pub(crate) fn from_http(client: crate::exporter::http::OtlpHttpClient) -> Self {
119128
LogExporter {
120-
client: Box::new(client),
129+
client: LogExporterInner::Http(client),
130+
}
131+
}
132+
133+
#[cfg(feature = "grpc-tonic")]
134+
pub(crate) fn from_tonic(client: crate::exporter::tonic::logs::TonicLogsClient) -> Self {
135+
LogExporter {
136+
client: LogExporterInner::Tonic(client),
121137
}
122138
}
123139
}
124140

125141
#[async_trait]
126142
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
127-
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
128-
self.client.export(batch).await
143+
#[allow(clippy::manual_async_fn)]
144+
fn export<'a>(
145+
&'a self,
146+
batch: &'a LogBatch<'a>,
147+
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
148+
async move {
149+
match &self.client {
150+
#[cfg(feature = "grpc-tonic")]
151+
LogExporterInner::Tonic(client) => client.export(batch).await,
152+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
153+
LogExporterInner::Http(client) => client.export(batch).await,
154+
}
155+
}
129156
}
130157

131158
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
132-
self.client.set_resource(resource);
159+
match &mut self.client {
160+
#[cfg(feature = "grpc-tonic")]
161+
LogExporterInner::Tonic(client) => client.set_resource(resource),
162+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
163+
LogExporterInner::Http(client) => client.set_resource(resource),
164+
}
133165
}
134166
}

opentelemetry-proto/src/transform/logs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ pub mod tonic {
178178
}
179179

180180
pub fn group_logs_by_resource_and_scope(
181-
logs: LogBatch<'_>,
181+
logs: &LogBatch<'_>,
182182
resource: &ResourceAttributesWithSchema,
183183
) -> Vec<ResourceLogs> {
184184
// Group logs by target or instrumentation name
@@ -261,7 +261,7 @@ mod tests {
261261
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
262262

263263
let grouped_logs =
264-
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
264+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);
265265

266266
assert_eq!(grouped_logs.len(), 1);
267267
let resource_logs = &grouped_logs[0];
@@ -281,7 +281,7 @@ mod tests {
281281
let log_batch = LogBatch::new(&logs);
282282
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
283283
let grouped_logs =
284-
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
284+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);
285285

286286
assert_eq!(grouped_logs.len(), 1);
287287
let resource_logs = &grouped_logs[0];

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ mod tests {
568568
}
569569

570570
impl LogExporter for MockLogExporter {
571+
#[allow(clippy::manual_async_fn)]
571572
fn export<'a>(
572573
&'a self,
573574
_batch: &'a LogBatch<'a>,
@@ -1065,6 +1066,7 @@ mod tests {
10651066
}
10661067

10671068
impl LogExporter for LogExporterThatRequiresTokio {
1069+
#[allow(clippy::manual_async_fn)]
10681070
fn export<'a>(
10691071
&'a self,
10701072
batch: &'a LogBatch<'a>,

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ impl InMemoryLogExporter {
181181
}
182182

183183
impl LogExporter for InMemoryLogExporter {
184+
#[allow(clippy::manual_async_fn)]
184185
fn export<'a>(
185186
&'a self,
186187
batch: &'a LogBatch<'a>,

0 commit comments

Comments
 (0)