Skip to content

Commit b1fff95

Browse files
committed
Merge main
2 parents a460184 + 6209c06 commit b1fff95

File tree

23 files changed

+1156
-556
lines changed

23 files changed

+1156
-556
lines changed

opentelemetry-appender-tracing/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ opentelemetry-stdout = { path = "../opentelemetry-stdout", features = ["logs"] }
2424
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs", "testing"] }
2525
tracing-subscriber = { workspace = true, features = ["registry", "std", "env-filter"] }
2626
tracing-log = "0.2"
27-
async-trait = { workspace = true }
2827
criterion = { workspace = true }
2928
tokio = { workspace = true, features = ["full"]}
3029

opentelemetry-appender-tracing/benches/logs.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
| ot_layer_enabled | 196 ns |
1414
*/
1515

16-
use async_trait::async_trait;
1716
use criterion::{criterion_group, criterion_main, Criterion};
1817
use opentelemetry::InstrumentationScope;
1918
use opentelemetry_appender_tracing::layer as tracing_layer;
@@ -32,10 +31,13 @@ struct NoopExporter {
3231
enabled: bool,
3332
}
3433

35-
#[async_trait]
3634
impl LogExporter for NoopExporter {
37-
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
38-
LogResult::Ok(())
35+
#[allow(clippy::manual_async_fn)]
36+
fn export(
37+
&self,
38+
_batch: LogBatch<'_>,
39+
) -> impl std::future::Future<Output = LogResult<()>> + Send {
40+
async { LogResult::Ok(()) }
3941
}
4042

4143
fn event_enabled(&self, _: opentelemetry::logs::Severity, _: &str, _: &str) -> bool {
@@ -44,17 +46,17 @@ impl LogExporter for NoopExporter {
4446
}
4547

4648
#[derive(Debug)]
47-
struct NoopProcessor {
48-
exporter: Box<dyn LogExporter>,
49+
struct NoopProcessor<E: LogExporter> {
50+
exporter: E,
4951
}
5052

51-
impl NoopProcessor {
52-
fn new(exporter: Box<dyn LogExporter>) -> Self {
53+
impl<E: LogExporter> NoopProcessor<E> {
54+
fn new(exporter: E) -> Self {
5355
Self { exporter }
5456
}
5557
}
5658

57-
impl LogProcessor for NoopProcessor {
59+
impl<E: LogExporter> LogProcessor for NoopProcessor<E> {
5860
fn emit(&self, _: &mut LogRecord, _: &InstrumentationScope) {
5961
// no-op
6062
}
@@ -124,7 +126,7 @@ fn benchmark_no_subscriber(c: &mut Criterion) {
124126

125127
fn benchmark_with_ot_layer(c: &mut Criterion, enabled: bool, bench_name: &str) {
126128
let exporter = NoopExporter { enabled };
127-
let processor = NoopProcessor::new(Box::new(exporter));
129+
let processor = NoopProcessor::new(exporter);
128130
let provider = LoggerProvider::builder()
129131
.with_resource(
130132
Resource::builder_empty()

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ const fn severity_of_level(level: &Level) -> Severity {
209209
#[cfg(test)]
210210
mod tests {
211211
use crate::layer;
212-
use async_trait::async_trait;
213212
use opentelemetry::logs::Severity;
214213
use opentelemetry::trace::TracerProvider as _;
215214
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
@@ -245,13 +244,18 @@ mod tests {
245244
#[derive(Clone, Debug, Default)]
246245
struct ReentrantLogExporter;
247246

248-
#[async_trait]
249247
impl LogExporter for ReentrantLogExporter {
250-
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
251-
// This will cause a deadlock as the export itself creates a log
252-
// while still within the lock of the SimpleLogProcessor.
253-
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
254-
Ok(())
248+
#[allow(clippy::manual_async_fn)]
249+
fn export(
250+
&self,
251+
_batch: LogBatch<'_>,
252+
) -> impl std::future::Future<Output = LogResult<()>> + Send {
253+
async {
254+
// This will cause a deadlock as the export itself creates a log
255+
// while still within the lock of the SimpleLogProcessor.
256+
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
257+
Ok(())
258+
}
255259
}
256260
}
257261

opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ license = "Apache-2.0"
66
publish = false
77

88
[features]
9-
default = ["reqwest", "experimental_metrics_periodicreader_with_async_runtime"]
10-
reqwest = ["opentelemetry-otlp/reqwest-client"]
9+
default = ["reqwest-blocking"]
10+
reqwest-blocking = ["opentelemetry-otlp/reqwest-blocking-client"]
1111
hyper = ["opentelemetry-otlp/hyper-client"]
12-
experimental_metrics_periodicreader_with_async_runtime = ["opentelemetry_sdk/experimental_metrics_periodicreader_with_async_runtime"]
13-
1412

1513
[dependencies]
1614
once_cell = { workspace = true }

opentelemetry-otlp/examples/basic-otlp-http/README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,9 @@ applications, these filters should be adjusted appropriately.
1313

1414
The example employs a `BatchExporter` for logs and traces, which is the
1515
recommended approach when using OTLP exporters. While it can be modified to use
16-
a `SimpleExporter`, this requires enabling feature flag `reqwest-blocking-client` and
17-
making the `main()` a normal main and *not* `tokio::main`
16+
a `SimpleExporter`, this requires making the main function a regular main and
17+
*not* tokio main.
1818

19-
// TODO: Metrics does not work with non tokio main when using `reqwest-blocking-client` today, fix that when switching
20-
// default to use own thread.
2119
// TODO: Document `hyper` feature flag when using SimpleProcessor.
2220

2321
## Usage

opentelemetry-otlp/examples/basic-otlp-http/src/main.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter};
1111
use opentelemetry_sdk::{
1212
logs::LoggerProvider,
1313
metrics::{MetricError, SdkMeterProvider},
14-
runtime,
1514
trace::{self as sdktrace, TracerProvider},
1615
};
1716
use opentelemetry_sdk::{
@@ -50,7 +49,9 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
5049
.build()?;
5150

5251
Ok(TracerProvider::builder()
53-
.with_batch_exporter(exporter, runtime::Tokio)
52+
// TODO: Enable BatchExporter after
53+
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
54+
.with_simple_exporter(exporter)
5455
.with_resource(RESOURCE.clone())
5556
.build())
5657
}
@@ -62,15 +63,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric
6263
.with_endpoint("http://localhost:4318/v1/metrics")
6364
.build()?;
6465

65-
#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
66-
let reader =
67-
opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader::builder(
68-
exporter,
69-
runtime::Tokio,
70-
)
71-
.build();
72-
// TODO: This does not work today. See https://github.com/open-telemetry/opentelemetry-rust/issues/2400
73-
#[cfg(not(feature = "experimental_metrics_periodicreader_with_async_runtime"))]
7466
let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter).build();
7567

7668
Ok(SdkMeterProvider::builder()
@@ -79,8 +71,10 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric
7971
.build())
8072
}
8173

82-
#[tokio::main]
83-
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
74+
// #[tokio::main]
75+
// TODO: Re-enable tokio::main, if needed, after
76+
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
77+
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
8478
let logger_provider = init_logs()?;
8579

8680
// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,56 @@
11
use std::sync::Arc;
22

3-
use async_trait::async_trait;
43
use http::{header::CONTENT_TYPE, Method};
54
use opentelemetry::otel_debug;
65
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter, ShutdownResult};
76
use opentelemetry_sdk::logs::{LogError, LogResult};
87

98
use super::OtlpHttpClient;
109

11-
#[async_trait]
1210
impl LogExporter for OtlpHttpClient {
13-
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
14-
let client = self
15-
.client
16-
.lock()
17-
.map_err(|e| LogError::Other(e.to_string().into()))
18-
.and_then(|g| match &*g {
19-
Some(client) => Ok(Arc::clone(client)),
20-
_ => Err(LogError::Other("exporter is already shut down".into())),
21-
})?;
22-
23-
let (body, content_type) = { self.build_logs_export_body(batch)? };
24-
let mut request = http::Request::builder()
25-
.method(Method::POST)
26-
.uri(&self.collector_endpoint)
27-
.header(CONTENT_TYPE, content_type)
28-
.body(body)
29-
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
30-
31-
for (k, v) in &self.headers {
32-
request.headers_mut().insert(k.clone(), v.clone());
33-
}
11+
#[allow(clippy::manual_async_fn)]
12+
fn export(
13+
&self,
14+
batch: LogBatch<'_>,
15+
) -> impl std::future::Future<Output = LogResult<()>> + Send {
16+
async move {
17+
let client = self
18+
.client
19+
.lock()
20+
.map_err(|e| LogError::Other(e.to_string().into()))
21+
.and_then(|g| match &*g {
22+
Some(client) => Ok(Arc::clone(client)),
23+
_ => Err(LogError::Other("exporter is already shut down".into())),
24+
})?;
3425

35-
let request_uri = request.uri().to_string();
36-
otel_debug!(name: "HttpLogsClient.CallingExport");
37-
let response = client.send(request).await?;
38-
39-
if !response.status().is_success() {
40-
let error = format!(
41-
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
42-
response.status().as_u16(),
43-
request_uri,
44-
response.body()
45-
);
46-
return Err(LogError::Other(error.into()));
47-
}
26+
let (body, content_type) = { self.build_logs_export_body(batch)? };
27+
let mut request = http::Request::builder()
28+
.method(Method::POST)
29+
.uri(&self.collector_endpoint)
30+
.header(CONTENT_TYPE, content_type)
31+
.body(body)
32+
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;
4833

49-
Ok(())
34+
for (k, v) in &self.headers {
35+
request.headers_mut().insert(k.clone(), v.clone());
36+
}
37+
38+
let request_uri = request.uri().to_string();
39+
otel_debug!(name: "HttpLogsClient.CallingExport");
40+
let response = client.send(request).await?;
41+
42+
if !response.status().is_success() {
43+
let error = format!(
44+
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
45+
response.status().as_u16(),
46+
request_uri,
47+
response.body()
48+
);
49+
return Err(LogError::Other(error.into()));
50+
}
51+
52+
Ok(())
53+
}
5054
}
5155

5256
fn shutdown(&mut self) -> ShutdownResult {

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::time::Duration;
2828
mod metrics;
2929

3030
#[cfg(feature = "logs")]
31-
mod logs;
31+
pub(crate) mod logs;
3232

3333
#[cfg(feature = "trace")]
3434
mod trace;
@@ -239,7 +239,7 @@ impl HttpExporterBuilder {
239239
OTEL_EXPORTER_OTLP_LOGS_HEADERS,
240240
)?;
241241

242-
Ok(crate::LogExporter::new(client))
242+
Ok(crate::LogExporter::from_http(client))
243243
}
244244

245245
/// Create a metrics exporter with the current configuration
@@ -265,7 +265,7 @@ impl HttpExporterBuilder {
265265
}
266266

267267
#[derive(Debug)]
268-
struct OtlpHttpClient {
268+
pub(crate) struct OtlpHttpClient {
269269
client: Mutex<Option<Arc<dyn HttpClient>>>,
270270
collector_endpoint: Uri,
271271
headers: HashMap<HeaderName, HeaderValue>,

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use async_trait::async_trait;
21
use core::fmt;
32
use opentelemetry::otel_debug;
43
use opentelemetry_proto::tonic::collector::logs::v1::{
@@ -56,37 +55,41 @@ impl TonicLogsClient {
5655
}
5756
}
5857

59-
#[async_trait]
6058
impl LogExporter for TonicLogsClient {
61-
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
62-
let (mut client, metadata, extensions) = match &self.inner {
63-
Some(inner) => {
64-
let (m, e, _) = inner
65-
.interceptor
66-
.lock()
67-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
68-
.call(Request::new(()))
69-
.map_err(|e| LogError::Other(Box::new(e)))?
70-
.into_parts();
71-
(inner.client.clone(), m, e)
72-
}
73-
None => return Err(LogError::Other("exporter is already shut down".into())),
74-
};
59+
#[allow(clippy::manual_async_fn)]
60+
fn export(
61+
&self,
62+
batch: LogBatch<'_>,
63+
) -> impl std::future::Future<Output = LogResult<()>> + Send {
64+
async move {
65+
let (mut client, metadata, extensions) = match &self.inner {
66+
Some(inner) => {
67+
let (m, e, _) = inner
68+
.interceptor
69+
.lock()
70+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
71+
.call(Request::new(()))
72+
.map_err(|e| LogError::Other(Box::new(e)))?
73+
.into_parts();
74+
(inner.client.clone(), m, e)
75+
}
76+
None => return Err(LogError::Other("exporter is already shut down".into())),
77+
};
7578

76-
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
79+
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
7780

78-
otel_debug!(name: "TonicsLogsClient.CallingExport");
81+
otel_debug!(name: "TonicsLogsClient.CallingExport");
7982

80-
client
81-
.export(Request::from_parts(
82-
metadata,
83-
extensions,
84-
ExportLogsServiceRequest { resource_logs },
85-
))
86-
.await
87-
.map_err(crate::Error::from)?;
88-
89-
Ok(())
83+
client
84+
.export(Request::from_parts(
85+
metadata,
86+
extensions,
87+
ExportLogsServiceRequest { resource_logs },
88+
))
89+
.await
90+
.map_err(crate::Error::from)?;
91+
Ok(())
92+
}
9093
}
9194

9295
fn shutdown(&mut self) -> ShutdownResult {

opentelemetry-otlp/src/exporter/tonic/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
};
2121

2222
#[cfg(feature = "logs")]
23-
mod logs;
23+
pub(crate) mod logs;
2424

2525
#[cfg(feature = "metrics")]
2626
mod metrics;
@@ -273,7 +273,7 @@ impl TonicExporterBuilder {
273273

274274
let client = TonicLogsClient::new(channel, interceptor, compression);
275275

276-
Ok(crate::logs::LogExporter::new(client))
276+
Ok(crate::logs::LogExporter::from_tonic(client))
277277
}
278278

279279
/// Build a new tonic metrics exporter

0 commit comments

Comments
 (0)