Skip to content

Commit a716211

Browse files
committed
use tokio mutex
1 parent 7016fbc commit a716211

File tree

1 file changed

+19
-27
lines changed
  • opentelemetry-otlp/src/exporter/tonic

1 file changed

+19
-27
lines changed

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
55
};
66
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
77
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
8+
use tokio::sync::Mutex;
89
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
910

10-
use super::BoxInterceptor;
1111
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
12-
use std::sync::Mutex;
12+
13+
use super::BoxInterceptor;
1314

1415
pub(crate) struct TonicLogsClient {
1516
inner: Mutex<Option<ClientInner>>,
@@ -56,25 +57,17 @@ impl TonicLogsClient {
5657

5758
impl LogExporter for TonicLogsClient {
5859
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
59-
let (mut client, metadata, extensions) = self
60-
.inner
61-
.lock()
62-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
63-
.and_then(|mut inner| match &mut *inner {
64-
Some(inner) => {
65-
let (m, e, _) = inner
66-
.interceptor
67-
.call(Request::new(()))
68-
.map_err(|e| {
69-
OTelSdkError::InternalFailure(format!(
70-
"unexpected status while exporting {e:?}"
71-
))
72-
})?
73-
.into_parts();
74-
Ok((inner.client.clone(), m, e))
75-
}
76-
None => Err(OTelSdkError::AlreadyShutdown),
77-
})?;
60+
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
61+
Some(inner) => {
62+
let (m, e, _) = inner
63+
.interceptor
64+
.call(Request::new(()))
65+
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
66+
.into_parts();
67+
(inner.client.clone(), m, e)
68+
}
69+
None => return Err(OTelSdkError::AlreadyShutdown),
70+
};
7871

7972
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
8073

@@ -92,12 +85,11 @@ impl LogExporter for TonicLogsClient {
9285
}
9386

9487
fn shutdown(&self) -> OTelSdkResult {
95-
match self
96-
.inner
97-
.lock()
98-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
99-
.take()
100-
{
88+
let handle = tokio::runtime::Handle::try_current()
89+
.unwrap_or_else(|_| tokio::runtime::Runtime::new().unwrap().handle().clone());
90+
91+
let mut inner = handle.block_on(self.inner.lock());
92+
match inner.take() {
10193
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
10294
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
10395
}

0 commit comments

Comments
 (0)