@@ -10,12 +10,14 @@ pub(crate) struct RetryPolicy {
1010 pub jitter_ms : u64 ,
1111}
1212
13+ // Generates a random jitter value up to max_jitter
1314fn generate_jitter ( max_jitter : u64 ) -> u64 {
1415 let now = SystemTime :: now ( ) ;
1516 let nanos = now. duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . subsec_nanos ( ) ;
1617 nanos as u64 % ( max_jitter + 1 )
1718}
1819
20+ // Retries the given operation with exponential backoff and jitter
1921pub ( crate ) async fn retry_with_exponential_backoff < F , Fut , T , E > (
2022 policy : RetryPolicy ,
2123 operation_name : & str ,
@@ -31,17 +33,115 @@ where
3133
3234 loop {
3335 match operation ( ) . await {
34- Ok ( result) => return Ok ( result) ,
36+ Ok ( result) => return Ok ( result) , // Return the result if the operation succeeds
3537 Err ( err) if attempt < policy. max_retries => {
3638 attempt += 1 ;
3739 // Log the error and retry after a delay with jitter
3840 otel_warn ! ( name: "OtlpRetry" , message = format!( "Retrying operation {:?} due to error: {:?}" , operation_name, err) ) ;
3941 let jitter = generate_jitter ( policy. jitter_ms ) ;
4042 let delay_with_jitter = std:: cmp:: min ( delay + jitter, policy. max_delay_ms ) ;
4143 sleep ( Duration :: from_millis ( delay_with_jitter) ) . await ;
42- delay = std:: cmp:: min ( delay * 2 , policy. max_delay_ms ) ;
44+ delay = std:: cmp:: min ( delay * 2 , policy. max_delay_ms ) ; // Exponential backoff
4345 }
44- Err ( err) => return Err ( err) ,
46+ Err ( err) => return Err ( err) , // Return the error if max retries are reached
4547 }
4648 }
49+ }
50+
51+ #[ cfg( test) ]
52+ mod tests {
53+ use super :: * ;
54+ use tokio:: time:: timeout;
55+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
56+ use std:: time:: Duration ;
57+
58+ // Test to ensure generate_jitter returns a value within the expected range
59+ #[ tokio:: test]
60+ async fn test_generate_jitter ( ) {
61+ let max_jitter = 100 ;
62+ let jitter = generate_jitter ( max_jitter) ;
63+ assert ! ( jitter <= max_jitter) ;
64+ }
65+
66+ // Test to ensure retry_with_exponential_backoff succeeds on the first attempt
67+ #[ tokio:: test]
68+ async fn test_retry_with_exponential_backoff_success ( ) {
69+ let policy = RetryPolicy {
70+ max_retries : 3 ,
71+ initial_delay_ms : 100 ,
72+ max_delay_ms : 1600 ,
73+ jitter_ms : 100 ,
74+ } ;
75+
76+ let result = retry_with_exponential_backoff ( policy, "test_operation" , || {
77+ Box :: pin ( async { Ok :: < _ , ( ) > ( "success" ) } )
78+ } ) . await ;
79+
80+ assert_eq ! ( result, Ok ( "success" ) ) ;
81+ }
82+
83+ // Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds
84+ #[ tokio:: test]
85+ async fn test_retry_with_exponential_backoff_retries ( ) {
86+ let policy = RetryPolicy {
87+ max_retries : 3 ,
88+ initial_delay_ms : 100 ,
89+ max_delay_ms : 1600 ,
90+ jitter_ms : 100 ,
91+ } ;
92+
93+ let attempts = AtomicUsize :: new ( 0 ) ;
94+
95+ let result = retry_with_exponential_backoff ( policy, "test_operation" , || {
96+ let attempt = attempts. fetch_add ( 1 , Ordering :: SeqCst ) ;
97+ Box :: pin ( async move {
98+ if attempt < 2 {
99+ Err :: < & str , & str > ( "error" ) // Fail the first two attempts
100+ } else {
101+ Ok :: < & str , & str > ( "success" ) // Succeed on the third attempt
102+ }
103+ } )
104+ } ) . await ;
105+
106+ assert_eq ! ( result, Ok ( "success" ) ) ;
107+ assert_eq ! ( attempts. load( Ordering :: SeqCst ) , 3 ) ; // Ensure there were 3 attempts
108+ }
109+
110+ // Test to ensure retry_with_exponential_backoff fails after max retries
111+ #[ tokio:: test]
112+ async fn test_retry_with_exponential_backoff_failure ( ) {
113+ let policy = RetryPolicy {
114+ max_retries : 3 ,
115+ initial_delay_ms : 100 ,
116+ max_delay_ms : 1600 ,
117+ jitter_ms : 100 ,
118+ } ;
119+
120+ let attempts = AtomicUsize :: new ( 0 ) ;
121+
122+ let result = retry_with_exponential_backoff ( policy, "test_operation" , || {
123+ attempts. fetch_add ( 1 , Ordering :: SeqCst ) ;
124+ Box :: pin ( async { Err :: < ( ) , _ > ( "error" ) } ) // Always fail
125+ } ) . await ;
126+
127+ assert_eq ! ( result, Err ( "error" ) ) ;
128+ assert_eq ! ( attempts. load( Ordering :: SeqCst ) , 4 ) ; // Ensure there were 4 attempts (initial + 3 retries)
129+ }
130+
131+ // Test to ensure retry_with_exponential_backoff respects the timeout
132+ #[ tokio:: test]
133+ async fn test_retry_with_exponential_backoff_timeout ( ) {
134+ let policy = RetryPolicy {
135+ max_retries : 12 , // Increase the number of retries
136+ initial_delay_ms : 100 ,
137+ max_delay_ms : 1600 ,
138+ jitter_ms : 100 ,
139+ } ;
140+
141+ let result = timeout ( Duration :: from_secs ( 1 ) , retry_with_exponential_backoff ( policy, "test_operation" , || {
142+ Box :: pin ( async { Err :: < ( ) , _ > ( "error" ) } ) // Always fail
143+ } ) ) . await ;
144+
145+ assert ! ( result. is_err( ) ) ; // Ensure the operation times out
146+ }
47147}
0 commit comments