Skip to content

Commit 862aec1

Browse files
authored
Merge branch 'main' into cijothomas/fix-flaky-test
2 parents 151bd51 + 06ca4a1 commit 862aec1

File tree

7 files changed

+104
-143
lines changed

7 files changed

+104
-143
lines changed

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,17 +263,11 @@ mod tests {
263263
struct ReentrantLogExporter;
264264

265265
impl LogExporter for ReentrantLogExporter {
266-
#[allow(clippy::manual_async_fn)]
267-
fn export(
268-
&self,
269-
_batch: LogBatch<'_>,
270-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
271-
async {
272-
// This will cause a deadlock as the export itself creates a log
273-
// while still within the lock of the SimpleLogProcessor.
274-
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
275-
Ok(())
276-
}
266+
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
267+
// This will cause a deadlock as the export itself creates a log
268+
// while still within the lock of the SimpleLogProcessor.
269+
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
270+
Ok(())
277271
}
278272
}
279273

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,45 @@ use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
55
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
66

77
impl LogExporter for OtlpHttpClient {
8-
#[allow(clippy::manual_async_fn)]
9-
fn export(
10-
&self,
11-
batch: LogBatch<'_>,
12-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
13-
async move {
14-
let client = self
15-
.client
16-
.lock()
17-
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
18-
.clone()
19-
.ok_or(OTelSdkError::AlreadyShutdown)?;
8+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
9+
let client = self
10+
.client
11+
.lock()
12+
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))?
13+
.clone()
14+
.ok_or(OTelSdkError::AlreadyShutdown)?;
2015

21-
let (body, content_type) = self
22-
.build_logs_export_body(batch)
23-
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
16+
let (body, content_type) = self
17+
.build_logs_export_body(batch)
18+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
2419

25-
let mut request = http::Request::builder()
26-
.method(Method::POST)
27-
.uri(&self.collector_endpoint)
28-
.header(CONTENT_TYPE, content_type)
29-
.body(body.into())
30-
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
20+
let mut request = http::Request::builder()
21+
.method(Method::POST)
22+
.uri(&self.collector_endpoint)
23+
.header(CONTENT_TYPE, content_type)
24+
.body(body.into())
25+
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
3126

32-
for (k, v) in &self.headers {
33-
request.headers_mut().insert(k.clone(), v.clone());
34-
}
27+
for (k, v) in &self.headers {
28+
request.headers_mut().insert(k.clone(), v.clone());
29+
}
3530

36-
let request_uri = request.uri().to_string();
37-
otel_debug!(name: "HttpLogsClient.CallingExport");
38-
let response = client
39-
.send_bytes(request)
40-
.await
41-
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
42-
if !response.status().is_success() {
43-
let error = format!(
44-
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
45-
request_uri,
46-
response.status().as_u16(),
47-
response.body()
48-
);
49-
return Err(OTelSdkError::InternalFailure(error));
50-
}
51-
Ok(())
31+
let request_uri = request.uri().to_string();
32+
otel_debug!(name: "HttpLogsClient.CallingExport");
33+
let response = client
34+
.send_bytes(request)
35+
.await
36+
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
37+
if !response.status().is_success() {
38+
let error = format!(
39+
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
40+
request_uri,
41+
response.status().as_u16(),
42+
response.body()
43+
);
44+
return Err(OTelSdkError::InternalFailure(error));
5245
}
46+
Ok(())
5347
}
5448

5549
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -56,40 +56,34 @@ impl TonicLogsClient {
5656
}
5757

5858
impl LogExporter for TonicLogsClient {
59-
#[allow(clippy::manual_async_fn)]
60-
fn export(
61-
&self,
62-
batch: LogBatch<'_>,
63-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
64-
async move {
65-
let (mut client, metadata, extensions) = match &self.inner {
66-
Some(inner) => {
67-
let (m, e, _) = inner
68-
.interceptor
69-
.lock()
70-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
71-
.call(Request::new(()))
72-
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
73-
.into_parts();
74-
(inner.client.clone(), m, e)
75-
}
76-
None => return Err(OTelSdkError::AlreadyShutdown),
77-
};
59+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
60+
let (mut client, metadata, extensions) = match &self.inner {
61+
Some(inner) => {
62+
let (m, e, _) = inner
63+
.interceptor
64+
.lock()
65+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
66+
.call(Request::new(()))
67+
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
68+
.into_parts();
69+
(inner.client.clone(), m, e)
70+
}
71+
None => return Err(OTelSdkError::AlreadyShutdown),
72+
};
7873

79-
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
74+
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
8075

81-
otel_debug!(name: "TonicsLogsClient.CallingExport");
76+
otel_debug!(name: "TonicsLogsClient.CallingExport");
8277

83-
client
84-
.export(Request::from_parts(
85-
metadata,
86-
extensions,
87-
ExportLogsServiceRequest { resource_logs },
88-
))
89-
.await
90-
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
91-
Ok(())
92-
}
78+
client
79+
.export(Request::from_parts(
80+
metadata,
81+
extensions,
82+
ExportLogsServiceRequest { resource_logs },
83+
))
84+
.await
85+
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
86+
Ok(())
9387
}
9488

9589
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/logs.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,12 @@ impl LogExporter {
139139
}
140140

141141
impl opentelemetry_sdk::logs::LogExporter for LogExporter {
142-
#[allow(clippy::manual_async_fn)]
143-
fn export(
144-
&self,
145-
batch: LogBatch<'_>,
146-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
147-
async move {
148-
match &self.client {
149-
#[cfg(feature = "grpc-tonic")]
150-
SupportedTransportClient::Tonic(client) => client.export(batch).await,
151-
#[cfg(any(feature = "http-proto", feature = "http-json"))]
152-
SupportedTransportClient::Http(client) => client.export(batch).await,
153-
}
142+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
143+
match &self.client {
144+
#[cfg(feature = "grpc-tonic")]
145+
SupportedTransportClient::Tonic(client) => client.export(batch).await,
146+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
147+
SupportedTransportClient::Http(client) => client.export(batch).await,
154148
}
155149
}
156150

opentelemetry-sdk/src/logs/in_memory_exporter.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -188,24 +188,18 @@ impl InMemoryLogExporter {
188188
}
189189

190190
impl LogExporter for InMemoryLogExporter {
191-
#[allow(clippy::manual_async_fn)]
192-
fn export(
193-
&self,
194-
batch: LogBatch<'_>,
195-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
196-
async move {
197-
let mut logs_guard = self.logs.lock().map_err(|e| {
198-
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e))
199-
})?;
200-
for (log_record, instrumentation) in batch.iter() {
201-
let owned_log = OwnedLogData {
202-
record: (*log_record).clone(),
203-
instrumentation: (*instrumentation).clone(),
204-
};
205-
logs_guard.push(owned_log);
206-
}
207-
Ok(())
191+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
192+
let mut logs_guard = self.logs.lock().map_err(|e| {
193+
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e))
194+
})?;
195+
for (log_record, instrumentation) in batch.iter() {
196+
let owned_log = OwnedLogData {
197+
record: (*log_record).clone(),
198+
instrumentation: (*instrumentation).clone(),
199+
};
200+
logs_guard.push(owned_log);
208201
}
202+
Ok(())
209203
}
210204

211205
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-stdout/src/logs/exporter.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,36 +30,30 @@ impl fmt::Debug for LogExporter {
3030
}
3131

3232
impl opentelemetry_sdk::logs::LogExporter for LogExporter {
33-
/// Export spans to stdout
34-
#[allow(clippy::manual_async_fn)]
35-
fn export(
36-
&self,
37-
batch: LogBatch<'_>,
38-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
39-
async move {
40-
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
41-
Err(OTelSdkError::AlreadyShutdown)
33+
/// Export logs to stdout
34+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
35+
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
36+
Err(OTelSdkError::AlreadyShutdown)
37+
} else {
38+
println!("Logs");
39+
if self
40+
.resource_emitted
41+
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
42+
.is_err()
43+
{
44+
print_logs(batch);
4245
} else {
43-
println!("Logs");
44-
if self
45-
.resource_emitted
46-
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
47-
.is_err()
48-
{
49-
print_logs(batch);
50-
} else {
51-
println!("Resource");
52-
if let Some(schema_url) = self.resource.schema_url() {
53-
println!("\t Resource SchemaUrl: {:?}", schema_url);
54-
}
55-
self.resource.iter().for_each(|(k, v)| {
56-
println!("\t -> {}={:?}", k, v);
57-
});
58-
print_logs(batch);
46+
println!("Resource");
47+
if let Some(schema_url) = self.resource.schema_url() {
48+
println!("\t Resource SchemaUrl: {:?}", schema_url);
5949
}
60-
61-
Ok(())
50+
self.resource.iter().for_each(|(k, v)| {
51+
println!("\t -> {}={:?}", k, v);
52+
});
53+
print_logs(batch);
6254
}
55+
56+
Ok(())
6357
}
6458
}
6559

stress/src/logs.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,8 @@ mod throughput;
2424
struct MockLogExporter;
2525

2626
impl LogExporter for MockLogExporter {
27-
fn export(
28-
&self,
29-
_batch: LogBatch<'_>,
30-
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
31-
async { Ok(()) }
27+
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
28+
Ok(())
3229
}
3330
}
3431

0 commit comments

Comments
 (0)