From 19da2daa27055d8ea57274dc5bac528630e659a2 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Fri, 14 Feb 2025 17:40:53 -0800 Subject: [PATCH] Nit cleanup of method signature for async export --- opentelemetry-appender-tracing/src/layer.rs | 16 ++-- opentelemetry-otlp/src/exporter/http/logs.rs | 74 +++++++++---------- opentelemetry-otlp/src/exporter/tonic/logs.rs | 56 +++++++------- opentelemetry-otlp/src/logs.rs | 18 ++--- .../src/logs/in_memory_exporter.rs | 28 +++---- opentelemetry-stdout/src/logs/exporter.rs | 48 ++++++------ stress/src/logs.rs | 7 +- 7 files changed, 104 insertions(+), 143 deletions(-) diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index a7e47e8ccd..ebacc89385 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -263,17 +263,11 @@ mod tests { struct ReentrantLogExporter; impl LogExporter for ReentrantLogExporter { - #[allow(clippy::manual_async_fn)] - fn export( - &self, - _batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async { - // This will cause a deadlock as the export itself creates a log - // while still within the lock of the SimpleLogProcessor. - warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); - Ok(()) - } + async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult { + // This will cause a deadlock as the export itself creates a log + // while still within the lock of the SimpleLogProcessor. + warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); + Ok(()) } } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index c7c0f92a71..d108e59c5c 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -5,51 +5,45 @@ use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; impl LogExporter for OtlpHttpClient { - #[allow(clippy::manual_async_fn)] - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async move { - let client = self - .client - .lock() - .map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))? - .clone() - .ok_or(OTelSdkError::AlreadyShutdown)?; + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let client = self + .client + .lock() + .map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {}", e)))? + .clone() + .ok_or(OTelSdkError::AlreadyShutdown)?; - let (body, content_type) = self - .build_logs_export_body(batch) - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; + let (body, content_type) = self + .build_logs_export_body(batch) + .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - let mut request = http::Request::builder() - .method(Method::POST) - .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body.into()) - .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; + let mut request = http::Request::builder() + .method(Method::POST) + .uri(&self.collector_endpoint) + .header(CONTENT_TYPE, content_type) + .body(body.into()) + .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; - for (k, v) in &self.headers { - request.headers_mut().insert(k.clone(), v.clone()); - } + for (k, v) in &self.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } - let request_uri = request.uri().to_string(); - otel_debug!(name: "HttpLogsClient.CallingExport"); - let response = client - .send_bytes(request) - .await - .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; - if !response.status().is_success() { - let error = format!( - "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}", - request_uri, - response.status().as_u16(), - response.body() - ); - return Err(OTelSdkError::InternalFailure(error)); - } - Ok(()) + let request_uri = request.uri().to_string(); + otel_debug!(name: "HttpLogsClient.CallingExport"); + let response = client + .send_bytes(request) + .await + .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; + if !response.status().is_success() { + let error = format!( + "OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}", + request_uri, + response.status().as_u16(), + response.body() + ); + return Err(OTelSdkError::InternalFailure(error)); } + Ok(()) } fn shutdown(&mut self) -> OTelSdkResult { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index d7841c2679..cdbed23be2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -56,40 +56,34 @@ impl TonicLogsClient { } impl LogExporter for TonicLogsClient { - #[allow(clippy::manual_async_fn)] - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async move { - let (mut client, metadata, extensions) = match &self.inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here - .call(Request::new(())) - .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(OTelSdkError::AlreadyShutdown), - }; + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let (mut client, metadata, extensions) = match &self.inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? + .into_parts(); + (inner.client.clone(), m, e) + } + None => return Err(OTelSdkError::AlreadyShutdown), + }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); - otel_debug!(name: "TonicsLogsClient.CallingExport"); + otel_debug!(name: "TonicsLogsClient.CallingExport"); - client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await - .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?; - Ok(()) - } + client + .export(Request::from_parts( + metadata, + extensions, + ExportLogsServiceRequest { resource_logs }, + )) + .await + .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?; + Ok(()) } fn shutdown(&mut self) -> OTelSdkResult { diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 1a85baeda8..4beede9086 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -139,18 +139,12 @@ impl LogExporter { } impl opentelemetry_sdk::logs::LogExporter for LogExporter { - #[allow(clippy::manual_async_fn)] - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async move { - match &self.client { - #[cfg(feature = "grpc-tonic")] - SupportedTransportClient::Tonic(client) => client.export(batch).await, - #[cfg(any(feature = "http-proto", feature = "http-json"))] - SupportedTransportClient::Http(client) => client.export(batch).await, - } + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + match &self.client { + #[cfg(feature = "grpc-tonic")] + SupportedTransportClient::Tonic(client) => client.export(batch).await, + #[cfg(any(feature = "http-proto", feature = "http-json"))] + SupportedTransportClient::Http(client) => client.export(batch).await, } } diff --git a/opentelemetry-sdk/src/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/logs/in_memory_exporter.rs index ab757f200e..278201de40 100644 --- a/opentelemetry-sdk/src/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/logs/in_memory_exporter.rs @@ -188,24 +188,18 @@ impl InMemoryLogExporter { } impl LogExporter for InMemoryLogExporter { - #[allow(clippy::manual_async_fn)] - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async move { - let mut logs_guard = self.logs.lock().map_err(|e| { - OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e)) - })?; - for (log_record, instrumentation) in batch.iter() { - let owned_log = OwnedLogData { - record: (*log_record).clone(), - instrumentation: (*instrumentation).clone(), - }; - logs_guard.push(owned_log); - } - Ok(()) + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let mut logs_guard = self.logs.lock().map_err(|e| { + OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e)) + })?; + for (log_record, instrumentation) in batch.iter() { + let owned_log = OwnedLogData { + record: (*log_record).clone(), + instrumentation: (*instrumentation).clone(), + }; + logs_guard.push(owned_log); } + Ok(()) } fn shutdown(&mut self) -> OTelSdkResult { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 4acd45a5a7..518dbfe2a3 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -30,36 +30,30 @@ impl fmt::Debug for LogExporter { } impl opentelemetry_sdk::logs::LogExporter for LogExporter { - /// Export spans to stdout - #[allow(clippy::manual_async_fn)] - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async move { - if self.is_shutdown.load(atomic::Ordering::SeqCst) { - Err(OTelSdkError::AlreadyShutdown) + /// Export logs to stdout + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + if self.is_shutdown.load(atomic::Ordering::SeqCst) { + Err(OTelSdkError::AlreadyShutdown) + } else { + println!("Logs"); + if self + .resource_emitted + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + print_logs(batch); } else { - println!("Logs"); - if self - .resource_emitted - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - print_logs(batch); - } else { - println!("Resource"); - if let Some(schema_url) = self.resource.schema_url() { - println!("\t Resource SchemaUrl: {:?}", schema_url); - } - self.resource.iter().for_each(|(k, v)| { - println!("\t -> {}={:?}", k, v); - }); - print_logs(batch); + println!("Resource"); + if let Some(schema_url) = self.resource.schema_url() { + println!("\t Resource SchemaUrl: {:?}", schema_url); } - - Ok(()) + self.resource.iter().for_each(|(k, v)| { + println!("\t -> {}={:?}", k, v); + }); + print_logs(batch); } + + Ok(()) } } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index f1a7ae22cb..65c0943b35 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -24,11 +24,8 @@ mod throughput; struct MockLogExporter; impl LogExporter for MockLogExporter { - fn export( - &self, - _batch: LogBatch<'_>, - ) -> impl std::future::Future + Send { - async { Ok(()) } + async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult { + Ok(()) } }