-
Notifications
You must be signed in to change notification settings - Fork 597
fix: handle shutdown in logs exporter #3255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,9 +5,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ | |||||||||
| }; | ||||||||||
| use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; | ||||||||||
| use opentelemetry_sdk::logs::{LogBatch, LogExporter}; | ||||||||||
| use std::sync::Arc; | ||||||||||
| use std::sync::{Arc, Mutex}; | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are there any repercussions of moving to std Mutex instead of tokio one inside the async export call?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine, and its the same pattern used in In the export, we hold the lock only so long as to clone the client and call the interceptor - and significantly, not across any await points, which is the main contra-indicator for std mutexes: let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}")))
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
// Convert interceptor errors to tonic::Status for retry classification
tonic::Status::internal(format!("interceptor error: {e:?}"))
})? // lock releasedI think if you called opentelemetry-rust/opentelemetry-sdk/src/logs/batch_log_processor.rs Lines 99 to 102 in df412fe
I note there are some other suggestions (e.g. spawning a thread) to do this particular thing in the linked issue, but this feels a bit overwrought. What are you thinking @cijothomas ? |
||||||||||
| use std::time; | ||||||||||
| use tokio::sync::Mutex; | ||||||||||
| use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; | ||||||||||
|
|
||||||||||
| use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; | ||||||||||
|
|
@@ -85,24 +84,26 @@ impl LogExporter for TonicLogsClient { | |||||||||
| let batch_clone = Arc::clone(&batch); | ||||||||||
|
|
||||||||||
| // Execute the export operation | ||||||||||
| let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() { | ||||||||||
| Some(inner) => { | ||||||||||
| let (m, e, _) = inner | ||||||||||
| .interceptor | ||||||||||
| .call(Request::new(())) | ||||||||||
| .map_err(|e| { | ||||||||||
| // Convert interceptor errors to tonic::Status for retry classification | ||||||||||
| tonic::Status::internal(format!("interceptor error: {e:?}")) | ||||||||||
| })? | ||||||||||
| .into_parts(); | ||||||||||
| (inner.client.clone(), m, e) | ||||||||||
| } | ||||||||||
| None => { | ||||||||||
| return Err(tonic::Status::failed_precondition( | ||||||||||
| "exporter already shutdown", | ||||||||||
| )) | ||||||||||
| } | ||||||||||
| }; | ||||||||||
| let (mut client, metadata, extensions) = self | ||||||||||
| .inner | ||||||||||
| .lock() | ||||||||||
| .map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}"))) | ||||||||||
| .and_then(|mut inner| match &mut *inner { | ||||||||||
| Some(inner) => { | ||||||||||
| let (m, e, _) = inner | ||||||||||
| .interceptor | ||||||||||
| .call(Request::new(())) | ||||||||||
| .map_err(|e| { | ||||||||||
| // Convert interceptor errors to tonic::Status for retry classification | ||||||||||
| tonic::Status::internal(format!("interceptor error: {e:?}")) | ||||||||||
| })? | ||||||||||
| .into_parts(); | ||||||||||
| Ok((inner.client.clone(), m, e)) | ||||||||||
| } | ||||||||||
| None => Err(tonic::Status::failed_precondition( | ||||||||||
| "log exporter is already shut down", | ||||||||||
| )), | ||||||||||
| })?; | ||||||||||
|
|
||||||||||
| let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource); | ||||||||||
|
|
||||||||||
|
|
@@ -143,13 +144,11 @@ impl LogExporter for TonicLogsClient { | |||||||||
| } | ||||||||||
|
|
||||||||||
| fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult { | ||||||||||
| // TODO: Implement actual shutdown | ||||||||||
| // Due to the use of tokio::sync::Mutex to guard | ||||||||||
| // the inner client, we need to await the call to lock the mutex | ||||||||||
| // and that requires async runtime. | ||||||||||
| // It is possible to fix this by using | ||||||||||
| // a dedicated thread just to handle shutdown. | ||||||||||
| // But for now, we just return Ok. | ||||||||||
| self.inner | ||||||||||
| .lock() | ||||||||||
| .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e}")))? | ||||||||||
| .take(); | ||||||||||
|
|
||||||||||
| Ok(()) | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,7 +93,7 @@ impl MetricsClient for TonicMetricsClient { | |
| Ok((inner.client.clone(), m, e)) | ||
| } | ||
| None => Err(tonic::Status::failed_precondition( | ||
| "exporter is already shut down", | ||
| "metrics exporter is already shut down", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is helpful; I don't think the user would necessarily see which failed otherwise |
||
| )), | ||
| })?; | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.