@@ -12,6 +12,9 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
12
12
use super :: BoxInterceptor ;
13
13
use crate :: metric:: MetricsClient ;
14
14
15
+ use opentelemetry_sdk:: retry:: { retry_with_exponential_backoff, RetryPolicy } ;
16
+ use opentelemetry_sdk:: runtime:: Tokio ;
17
+
15
18
pub ( crate ) struct TonicMetricsClient {
16
19
inner : Mutex < Option < ClientInner > > ,
17
20
}
@@ -53,49 +56,63 @@ impl TonicMetricsClient {
53
56
54
57
impl MetricsClient for TonicMetricsClient {
55
58
async fn export ( & self , metrics : & ResourceMetrics ) -> OTelSdkResult {
56
- let ( mut client, metadata, extensions) = self
57
- . inner
58
- . lock ( )
59
- . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "Failed to acquire lock: {e:?}" ) ) )
60
- . and_then ( |mut inner| match & mut * inner {
61
- Some ( inner) => {
62
- let ( m, e, _) = inner
63
- . interceptor
64
- . call ( Request :: new ( ( ) ) )
65
- . map_err ( |e| {
66
- OTelSdkError :: InternalFailure ( format ! (
67
- "unexpected status while exporting {e:?}"
68
- ) )
69
- } ) ?
70
- . into_parts ( ) ;
71
- Ok ( ( inner. client . clone ( ) , m, e) )
72
- }
73
- None => Err ( OTelSdkError :: InternalFailure (
74
- "exporter is already shut down" . into ( ) ,
75
- ) ) ,
76
- } ) ?;
77
-
78
- otel_debug ! ( name: "TonicMetricsClient.ExportStarted" ) ;
79
-
80
- let result = client
81
- . export ( Request :: from_parts (
82
- metadata,
83
- extensions,
84
- ExportMetricsServiceRequest :: from ( metrics) ,
85
- ) )
86
- . await ;
87
-
88
- match result {
89
- Ok ( _) => {
90
- otel_debug ! ( name: "TonicMetricsClient.ExportSucceeded" ) ;
91
- Ok ( ( ) )
92
- }
93
- Err ( e) => {
94
- let error = format ! ( "{e:?}" ) ;
95
- otel_debug ! ( name: "TonicMetricsClient.ExportFailed" , error = & error) ;
96
- Err ( OTelSdkError :: InternalFailure ( error) )
59
+ let policy = RetryPolicy {
60
+ max_retries : 3 ,
61
+ initial_delay_ms : 100 ,
62
+ max_delay_ms : 1600 ,
63
+ jitter_ms : 100 ,
64
+ } ;
65
+
66
+ retry_with_exponential_backoff ( Tokio , policy, "TonicMetricsClient.Export" , {
67
+ let inner = & self . inner ;
68
+ move || {
69
+ Box :: pin ( async move {
70
+ let ( mut client, metadata, extensions) = inner
71
+ . lock ( )
72
+ . map_err ( |e| OTelSdkError :: InternalFailure ( format ! ( "Failed to acquire lock: {e:?}" ) ) )
73
+ . and_then ( |mut inner| match & mut * inner {
74
+ Some ( inner) => {
75
+ let ( m, e, _) = inner
76
+ . interceptor
77
+ . call ( Request :: new ( ( ) ) )
78
+ . map_err ( |e| {
79
+ OTelSdkError :: InternalFailure ( format ! (
80
+ "unexpected status while exporting {e:?}"
81
+ ) )
82
+ } ) ?
83
+ . into_parts ( ) ;
84
+ Ok ( ( inner. client . clone ( ) , m, e) )
85
+ }
86
+ None => Err ( OTelSdkError :: InternalFailure (
87
+ "exporter is already shut down" . into ( ) ,
88
+ ) ) ,
89
+ } ) ?;
90
+
91
+ otel_debug ! ( name: "TonicMetricsClient.ExportStarted" ) ;
92
+
93
+ let result = client
94
+ . export ( Request :: from_parts (
95
+ metadata,
96
+ extensions,
97
+ ExportMetricsServiceRequest :: from ( metrics) ,
98
+ ) )
99
+ . await ;
100
+
101
+ match result {
102
+ Ok ( _) => {
103
+ otel_debug ! ( name: "TonicMetricsClient.ExportSucceeded" ) ;
104
+ Ok ( ( ) )
105
+ }
106
+ Err ( e) => {
107
+ let error = format ! ( "export error: {e:?}" ) ;
108
+ otel_debug ! ( name: "TonicMetricsClient.ExportFailed" , error = & error) ;
109
+ Err ( OTelSdkError :: InternalFailure ( error) )
110
+ }
111
+ }
112
+ } )
97
113
}
98
- }
114
+ } )
115
+ . await
99
116
}
100
117
101
118
fn shutdown ( & self ) -> OTelSdkResult {
0 commit comments