Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Add partial success response handling for OTLP exporters (traces, metrics, logs) per OTLP spec. Exporters now log warnings when the server returns partial success responses with rejected items and error messages. [#865](https://github.com/open-telemetry/opentelemetry-rust/issues/865)
- Refactor `internal-logs` feature in `opentelemetry-otlp` to reduce unnecessary dependencies[3191](https://github.com/open-telemetry/opentelemetry-rust/pull/3192)
- Fixed [#2777](https://github.com/open-telemetry/opentelemetry rust/issues/2777) to properly handle `shutdown_with_timeout()` when using `grpc-tonic`.

## 0.31.0

Expand Down
53 changes: 26 additions & 27 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any repercussions of moving to std Mutex instead of tokio one inside the async export call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, and its the same pattern used in TonicsMetricsClient for the last couple years, which as far as I can tell we've not had problems with (linky to blame).

In the export, we hold the lock only so long as to clone the client and call the interceptor - and significantly, not across any await points, which is the main contra-indicator for std mutexes:

                let (mut client, metadata, extensions) = self
                    .inner
                    .lock()
                    .map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}")))
                    .and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
// Convert interceptor errors to tonic::Status for retry classification
tonic::Status::internal(format!("interceptor error: {e:?}"))
})? // lock released

I think if you called shutdown_with_timeout on a single-threaded tokio runtime while the runtime is still trying to do an export, and the export was at that exact moment in the code I highlighted above, then you could potentially deadlock it, but this relies on amazing timing, and I think we already warn against doing exactly this:

/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
/// is a blocking call ,should not be called from your main thread. This can
/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
/// tokio's `spawn_blocking`.

I note there are some other suggestions (e.g. spawning a thread) to do this particular thing in the linked issue, but this feels a bit overwrought.

What are you thinking @cijothomas ?

use std::time;
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
Expand Down Expand Up @@ -85,24 +84,26 @@ impl LogExporter for TonicLogsClient {
let batch_clone = Arc::clone(&batch);

// Execute the export operation
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
// Convert interceptor errors to tonic::Status for retry classification
tonic::Status::internal(format!("interceptor error: {e:?}"))
})?
.into_parts();
(inner.client.clone(), m, e)
}
None => {
return Err(tonic::Status::failed_precondition(
"exporter already shutdown",
))
}
};
let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}")))
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
// Convert interceptor errors to tonic::Status for retry classification
tonic::Status::internal(format!("interceptor error: {e:?}"))
})?
.into_parts();
Ok((inner.client.clone(), m, e))
}
None => Err(tonic::Status::failed_precondition(
"log exporter is already shut down",
)),
})?;

let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource);

Expand Down Expand Up @@ -143,13 +144,11 @@ impl LogExporter for TonicLogsClient {
}

fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
// TODO: Implement actual shutdown
// Due to the use of tokio::sync::Mutex to guard
// the inner client, we need to await the call to lock the mutex
// and that requires async runtime.
// It is possible to fix this by using
// a dedicated thread just to handle shutdown.
// But for now, we just return Ok.
self.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))?
.take();

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MetricsClient for TonicMetricsClient {
Ok((inner.client.clone(), m, e))
}
None => Err(tonic::Status::failed_precondition(
"exporter is already shut down",
"metrics exporter is already shut down",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is helpful; I don't think the user would necessarily see which failed otherwise

)),
})?;

Expand Down