1
1
use core:: fmt;
2
+ use std:: sync:: Arc ;
2
3
use tokio:: sync:: Mutex ;
3
4
4
5
use opentelemetry:: otel_debug;
@@ -15,6 +16,9 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
15
16
16
17
use super :: BoxInterceptor ;
17
18
19
+ use opentelemetry_sdk:: retry:: { retry_with_exponential_backoff, RetryPolicy } ;
20
+ use opentelemetry_sdk:: runtime:: Tokio ;
21
+
18
22
pub ( crate ) struct TonicTracesClient {
19
23
inner : Option < ClientInner > ,
20
24
#[ allow( dead_code) ]
@@ -60,43 +64,63 @@ impl TonicTracesClient {
60
64
61
65
impl SpanExporter for TonicTracesClient {
62
66
async fn export ( & self , batch : Vec < SpanData > ) -> OTelSdkResult {
63
- let ( mut client, metadata, extensions) = match & self . inner {
64
- Some ( inner) => {
65
- let ( m, e, _) = inner
66
- . interceptor
67
- . lock ( )
68
- . await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
69
- . call ( Request :: new ( ( ) ) )
70
- . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "error: {e:?}" ) ) ) ?
71
- . into_parts ( ) ;
72
- ( inner. client . clone ( ) , m, e)
73
- }
74
- None => return Err ( OTelSdkError :: AlreadyShutdown ) ,
67
+ let policy = RetryPolicy {
68
+ max_retries : 3 ,
69
+ initial_delay_ms : 100 ,
70
+ max_delay_ms : 1600 ,
71
+ jitter_ms : 100 ,
75
72
} ;
76
73
77
- let resource_spans = group_spans_by_resource_and_scope ( batch, & self . resource ) ;
74
+ let batch = Arc :: new ( batch) ;
78
75
79
- otel_debug ! ( name: "TonicTracesClient.ExportStarted" ) ;
76
+ retry_with_exponential_backoff ( Tokio , policy, "TonicTracesClient.Export" , {
77
+ let batch = Arc :: clone ( & batch) ;
78
+ let inner = & self . inner ;
79
+ let resource = & self . resource ;
80
+ move || {
81
+ let batch = Arc :: clone ( & batch) ;
82
+ Box :: pin ( async move {
83
+ let ( mut client, metadata, extensions) = match inner {
84
+ Some ( inner) => {
85
+ let ( m, e, _) = inner
86
+ . interceptor
87
+ . lock ( )
88
+ . await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
89
+ . call ( Request :: new ( ( ) ) )
90
+ . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "error: {e:?}" ) ) ) ?
91
+ . into_parts ( ) ;
92
+ ( inner. client . clone ( ) , m, e)
93
+ }
94
+ None => return Err ( OTelSdkError :: AlreadyShutdown ) ,
95
+ } ;
80
96
81
- let result = client
82
- . export ( Request :: from_parts (
83
- metadata,
84
- extensions,
85
- ExportTraceServiceRequest { resource_spans } ,
86
- ) )
87
- . await ;
97
+ let resource_spans = group_spans_by_resource_and_scope ( ( * batch) . clone ( ) , resource) ;
88
98
89
- match result {
90
- Ok ( _) => {
91
- otel_debug ! ( name: "TonicTracesClient.ExportSucceeded" ) ;
92
- Ok ( ( ) )
93
- }
94
- Err ( e) => {
95
- let error = e. to_string ( ) ;
96
- otel_debug ! ( name: "TonicTracesClient.ExportFailed" , error = & error) ;
97
- Err ( OTelSdkError :: InternalFailure ( error) )
99
+ otel_debug ! ( name: "TonicTracesClient.ExportStarted" ) ;
100
+
101
+ let result = client
102
+ . export ( Request :: from_parts (
103
+ metadata,
104
+ extensions,
105
+ ExportTraceServiceRequest { resource_spans } ,
106
+ ) )
107
+ . await ;
108
+
109
+ match result {
110
+ Ok ( _) => {
111
+ otel_debug ! ( name: "TonicTracesClient.ExportSucceeded" ) ;
112
+ Ok ( ( ) )
113
+ }
114
+ Err ( e) => {
115
+ let error = format ! ( "export error: {e:?}" ) ;
116
+ otel_debug ! ( name: "TonicTracesClient.ExportFailed" , error = & error) ;
117
+ Err ( OTelSdkError :: InternalFailure ( error) )
118
+ }
119
+ }
120
+ } )
98
121
}
99
- }
122
+ } )
123
+ . await
100
124
}
101
125
102
126
fn shutdown ( & mut self ) -> OTelSdkResult {
0 commit comments