diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index fda200dea0..53a1d5b260 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -4,6 +4,7 @@ - Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865) - Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192) +- Fixed [#2777](https://github.com/open-telemetry/opentelemetry rust/issues/2777) to properly handle `shutdown_with_timeout()` when using `grpc-tonic`. ## 0.31.0 diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index fbe033d09e..cd8ae75c9f 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -5,9 +5,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time; -use tokio::sync::Mutex; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; @@ -85,24 +84,26 @@ impl LogExporter for TonicLogsClient { let batch_clone = Arc::clone(&batch); // Execute the export operation - let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .call(Request::new(())) - .map_err(|e| { - // Convert interceptor errors to tonic::Status for retry classification - tonic::Status::internal(format!("interceptor error: {e:?}")) - })? - .into_parts(); - (inner.client.clone(), m, e) - } - None => { - return Err(tonic::Status::failed_precondition( - "exporter already shutdown", - )) - } - }; + let (mut client, metadata, extensions) = self + .inner + .lock() + .map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}"))) + .and_then(|mut inner| match &mut *inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .call(Request::new(())) + .map_err(|e| { + // Convert interceptor errors to tonic::Status for retry classification + tonic::Status::internal(format!("interceptor error: {e:?}")) + })? + .into_parts(); + Ok((inner.client.clone(), m, e)) + } + None => Err(tonic::Status::failed_precondition( + "log exporter is already shut down", + )), + })?; let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource); @@ -143,13 +144,11 @@ impl LogExporter for TonicLogsClient { } fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult { - // TODO: Implement actual shutdown - // Due to the use of tokio::sync::Mutex to guard - // the inner client, we need to await the call to lock the mutex - // and that requires async runtime. - // It is possible to fix this by using - // a dedicated thread just to handle shutdown. - // But for now, we just return Ok. + self.inner + .lock() + .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))? + .take(); + Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index 5239ed5460..a38fd834f7 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -93,7 +93,7 @@ impl MetricsClient for TonicMetricsClient { Ok((inner.client.clone(), m, e)) } None => Err(tonic::Status::failed_precondition( - "exporter is already shut down", + "metrics exporter is already shut down", )), })?;