@@ -10,12 +10,14 @@ pub(crate) struct RetryPolicy {
10
10
pub jitter_ms : u64 ,
11
11
}
12
12
13
+ // Generates a random jitter value up to max_jitter
13
14
fn generate_jitter ( max_jitter : u64 ) -> u64 {
14
15
let now = SystemTime :: now ( ) ;
15
16
let nanos = now. duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . subsec_nanos ( ) ;
16
17
nanos as u64 % ( max_jitter + 1 )
17
18
}
18
19
20
+ // Retries the given operation with exponential backoff and jitter
19
21
pub ( crate ) async fn retry_with_exponential_backoff < F , Fut , T , E > (
20
22
policy : RetryPolicy ,
21
23
operation_name : & str ,
@@ -31,17 +33,115 @@ where
31
33
32
34
loop {
33
35
match operation ( ) . await {
34
- Ok ( result) => return Ok ( result) ,
36
+ Ok ( result) => return Ok ( result) , // Return the result if the operation succeeds
35
37
Err ( err) if attempt < policy. max_retries => {
36
38
attempt += 1 ;
37
39
// Log the error and retry after a delay with jitter
38
40
otel_warn ! ( name: "OtlpRetry" , message = format!( "Retrying operation {:?} due to error: {:?}" , operation_name, err) ) ;
39
41
let jitter = generate_jitter ( policy. jitter_ms ) ;
40
42
let delay_with_jitter = std:: cmp:: min ( delay + jitter, policy. max_delay_ms ) ;
41
43
sleep ( Duration :: from_millis ( delay_with_jitter) ) . await ;
42
- delay = std:: cmp:: min ( delay * 2 , policy. max_delay_ms ) ;
44
+ delay = std:: cmp:: min ( delay * 2 , policy. max_delay_ms ) ; // Exponential backoff
43
45
}
44
- Err ( err) => return Err ( err) ,
46
+ Err ( err) => return Err ( err) , // Return the error if max retries are reached
45
47
}
46
48
}
49
+ }
50
+
51
+ #[ cfg( test) ]
52
+ mod tests {
53
+ use super :: * ;
54
+ use tokio:: time:: timeout;
55
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
56
+ use std:: time:: Duration ;
57
+
58
+ // Test to ensure generate_jitter returns a value within the expected range
59
+ #[ tokio:: test]
60
+ async fn test_generate_jitter ( ) {
61
+ let max_jitter = 100 ;
62
+ let jitter = generate_jitter ( max_jitter) ;
63
+ assert ! ( jitter <= max_jitter) ;
64
+ }
65
+
66
+ // Test to ensure retry_with_exponential_backoff succeeds on the first attempt
67
+ #[ tokio:: test]
68
+ async fn test_retry_with_exponential_backoff_success ( ) {
69
+ let policy = RetryPolicy {
70
+ max_retries : 3 ,
71
+ initial_delay_ms : 100 ,
72
+ max_delay_ms : 1600 ,
73
+ jitter_ms : 100 ,
74
+ } ;
75
+
76
+ let result = retry_with_exponential_backoff ( policy, "test_operation" , || {
77
+ Box :: pin ( async { Ok :: < _ , ( ) > ( "success" ) } )
78
+ } ) . await ;
79
+
80
+ assert_eq ! ( result, Ok ( "success" ) ) ;
81
+ }
82
+
83
+ // Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds
84
+ #[ tokio:: test]
85
+ async fn test_retry_with_exponential_backoff_retries ( ) {
86
+ let policy = RetryPolicy {
87
+ max_retries : 3 ,
88
+ initial_delay_ms : 100 ,
89
+ max_delay_ms : 1600 ,
90
+ jitter_ms : 100 ,
91
+ } ;
92
+
93
+ let attempts = AtomicUsize :: new ( 0 ) ;
94
+
95
+ let result = retry_with_exponential_backoff ( policy, "test_operation" , || {
96
+ let attempt = attempts. fetch_add ( 1 , Ordering :: SeqCst ) ;
97
+ Box :: pin ( async move {
98
+ if attempt < 2 {
99
+ Err :: < & str , & str > ( "error" ) // Fail the first two attempts
100
+ } else {
101
+ Ok :: < & str , & str > ( "success" ) // Succeed on the third attempt
102
+ }
103
+ } )
104
+ } ) . await ;
105
+
106
+ assert_eq ! ( result, Ok ( "success" ) ) ;
107
+ assert_eq ! ( attempts. load( Ordering :: SeqCst ) , 3 ) ; // Ensure there were 3 attempts
108
+ }
109
+
110
+ // Test to ensure retry_with_exponential_backoff fails after max retries
111
+ #[ tokio:: test]
112
+ async fn test_retry_with_exponential_backoff_failure ( ) {
113
+ let policy = RetryPolicy {
114
+ max_retries : 3 ,
115
+ initial_delay_ms : 100 ,
116
+ max_delay_ms : 1600 ,
117
+ jitter_ms : 100 ,
118
+ } ;
119
+
120
+ let attempts = AtomicUsize :: new ( 0 ) ;
121
+
122
+ let result = retry_with_exponential_backoff ( policy, "test_operation" , || {
123
+ attempts. fetch_add ( 1 , Ordering :: SeqCst ) ;
124
+ Box :: pin ( async { Err :: < ( ) , _ > ( "error" ) } ) // Always fail
125
+ } ) . await ;
126
+
127
+ assert_eq ! ( result, Err ( "error" ) ) ;
128
+ assert_eq ! ( attempts. load( Ordering :: SeqCst ) , 4 ) ; // Ensure there were 4 attempts (initial + 3 retries)
129
+ }
130
+
131
+ // Test to ensure retry_with_exponential_backoff respects the timeout
132
+ #[ tokio:: test]
133
+ async fn test_retry_with_exponential_backoff_timeout ( ) {
134
+ let policy = RetryPolicy {
135
+ max_retries : 12 , // Increase the number of retries
136
+ initial_delay_ms : 100 ,
137
+ max_delay_ms : 1600 ,
138
+ jitter_ms : 100 ,
139
+ } ;
140
+
141
+ let result = timeout ( Duration :: from_secs ( 1 ) , retry_with_exponential_backoff ( policy, "test_operation" , || {
142
+ Box :: pin ( async { Err :: < ( ) , _ > ( "error" ) } ) // Always fail
143
+ } ) ) . await ;
144
+
145
+ assert ! ( result. is_err( ) ) ; // Ensure the operation times out
146
+ }
47
147
}
0 commit comments