@@ -7,20 +7,20 @@ use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
77use opentelemetry_sdk:: logs:: { LogBatch , LogExporter } ;
88use tonic:: { codegen:: CompressionEncoding , service:: Interceptor , transport:: Channel , Request } ;
99
10- use opentelemetry_proto:: transform:: logs:: tonic:: group_logs_by_resource_and_scope;
11-
1210use super :: BoxInterceptor ;
11+ use opentelemetry_proto:: transform:: logs:: tonic:: group_logs_by_resource_and_scope;
12+ use std:: sync:: Mutex ;
1313
1414pub ( crate ) struct TonicLogsClient {
15- inner : Option < ClientInner > ,
15+ inner : Mutex < Option < ClientInner > > ,
1616 #[ allow( dead_code) ]
1717 // <allow dead> would be removed once we support set_resource for metrics.
1818 resource : opentelemetry_proto:: transform:: common:: tonic:: ResourceAttributesWithSchema ,
1919}
2020
2121struct ClientInner {
2222 client : LogsServiceClient < Channel > ,
23- interceptor : tokio :: sync :: Mutex < BoxInterceptor > ,
23+ interceptor : BoxInterceptor ,
2424}
2525
2626impl fmt:: Debug for TonicLogsClient {
@@ -45,30 +45,38 @@ impl TonicLogsClient {
4545 otel_debug ! ( name: "TonicsLogsClientBuilt" ) ;
4646
4747 TonicLogsClient {
48- inner : Some ( ClientInner {
48+ inner : Mutex :: new ( Some ( ClientInner {
4949 client,
50- interceptor : tokio :: sync :: Mutex :: new ( interceptor) ,
51- } ) ,
50+ interceptor : interceptor,
51+ } ) ) ,
5252 resource : Default :: default ( ) ,
5353 }
5454 }
5555}
5656
5757impl LogExporter for TonicLogsClient {
5858 async fn export ( & self , batch : LogBatch < ' _ > ) -> OTelSdkResult {
59- let ( mut client, metadata, extensions) = match & self . inner {
60- Some ( inner) => {
61- let ( m, e, _) = inner
62- . interceptor
63- . lock ( )
64- . await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
65- . call ( Request :: new ( ( ) ) )
66- . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "error: {:?}" , e) ) ) ?
67- . into_parts ( ) ;
68- ( inner. client . clone ( ) , m, e)
69- }
70- None => return Err ( OTelSdkError :: AlreadyShutdown ) ,
71- } ;
59+ let ( mut client, metadata, extensions) = self
60+ . inner
61+ . lock ( )
62+ . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "Failed to acquire lock: {e:?}" ) ) )
63+ . and_then ( |mut inner| match & mut * inner {
64+ Some ( inner) => {
65+ let ( m, e, _) = inner
66+ . interceptor
67+ . call ( Request :: new ( ( ) ) )
68+ . map_err ( |e| {
69+ OTelSdkError :: InternalFailure ( format ! (
70+ "unexpected status while exporting {e:?}"
71+ ) )
72+ } ) ?
73+ . into_parts ( ) ;
74+ Ok ( ( inner. client . clone ( ) , m, e) )
75+ }
76+ None => Err ( OTelSdkError :: InternalFailure (
77+ "exporter is already shut down" . into ( ) ,
78+ ) ) ,
79+ } ) ?;
7280
7381 let resource_logs = group_logs_by_resource_and_scope ( batch, & self . resource ) ;
7482
@@ -85,8 +93,13 @@ impl LogExporter for TonicLogsClient {
8593 Ok ( ( ) )
8694 }
8795
88- fn shutdown ( & mut self ) -> OTelSdkResult {
89- match self . inner . take ( ) {
96+ fn shutdown ( & self ) -> OTelSdkResult {
97+ match self
98+ . inner
99+ . lock ( )
100+ . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "Failed to acquire lock: {}" , e) ) ) ?
101+ . take ( )
102+ {
90103 Some ( _) => Ok ( ( ) ) , // Successfully took `inner`, indicating a successful shutdown.
91104 None => Err ( OTelSdkError :: AlreadyShutdown ) , // `inner` was already `None`, meaning it's already shut down.
92105 }
0 commit comments