Skip to content

Commit 75f0d71

Browse files
committed
simplify retry interface
1 parent fb141db commit 75f0d71

File tree

4 files changed

+93
-138
lines changed

4 files changed

+93
-138
lines changed

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1515
use super::BoxInterceptor;
1616

1717
use crate::retry_classification::grpc::classify_tonic_status;
18-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff_classified, RetryPolicy};
18+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
1919
use opentelemetry_sdk::runtime::Tokio;
2020

2121
pub(crate) struct TonicLogsClient {
@@ -72,7 +72,7 @@ impl LogExporter for TonicLogsClient {
7272

7373
let batch = Arc::new(batch);
7474

75-
match retry_with_exponential_backoff_classified(
75+
match retry_with_backoff(
7676
Tokio,
7777
policy,
7878
classify_tonic_status,

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use super::BoxInterceptor;
1313
use crate::metric::MetricsClient;
1414

1515
use crate::retry_classification::grpc::classify_tonic_status;
16-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff_classified, RetryPolicy};
16+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
1717
use opentelemetry_sdk::runtime::Tokio;
1818

1919
pub(crate) struct TonicMetricsClient {
@@ -64,7 +64,7 @@ impl MetricsClient for TonicMetricsClient {
6464
jitter_ms: 100,
6565
};
6666

67-
match retry_with_exponential_backoff_classified(
67+
match retry_with_backoff(
6868
Tokio,
6969
policy,
7070
classify_tonic_status,

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1717
use super::BoxInterceptor;
1818

1919
use crate::retry_classification::grpc::classify_tonic_status;
20-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff_classified, RetryPolicy};
20+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
2121
use opentelemetry_sdk::runtime::Tokio;
2222

2323
pub(crate) struct TonicTracesClient {
@@ -74,7 +74,7 @@ impl SpanExporter for TonicTracesClient {
7474

7575
let batch = Arc::new(batch);
7676

77-
match retry_with_exponential_backoff_classified(
77+
match retry_with_backoff(
7878
Tokio,
7979
policy,
8080
classify_tonic_status,

opentelemetry-sdk/src/retry.rs

Lines changed: 87 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
//! The `RetryPolicy` struct defines the configuration for the retry behavior, including the maximum
44
//! number of retries, initial delay, maximum delay, and jitter.
55
//!
6-
//! The `retry_with_exponential_backoff` function retries the given operation according to the
6+
//! The `retry_with_backoff` function retries the given operation according to the
77
//! specified retry policy, using exponential backoff and jitter to determine the delay between
8-
//! retries. The function logs errors and retries the operation until it succeeds or the maximum
9-
//! number of retries is reached.
8+
//! retries. The function uses error classification to determine retry behavior and can honor
9+
//! server-provided throttling hints.
1010
1111
#[cfg(feature = "experimental_async_runtime")]
1212
use opentelemetry::otel_warn;
@@ -69,45 +69,7 @@ fn generate_jitter(max_jitter: u64) -> u64 {
6969
nanos as u64 % (max_jitter + 1)
7070
}
7171

72-
/// Retries the given operation with exponential backoff and jitter.
73-
///
74-
/// # Arguments
75-
///
76-
/// * `runtime` - The async runtime to use for delays.
77-
/// * `policy` - The retry policy configuration.
78-
/// * `operation_name` - The name of the operation being retried.
79-
/// * `operation` - The operation to be retried.
80-
///
81-
/// # Returns
82-
///
83-
/// A `Result` containing the operation's result or an error if the maximum retries are reached.
84-
#[cfg(feature = "experimental_async_runtime")]
85-
pub async fn retry_with_exponential_backoff<R, F, Fut, T, E>(
86-
runtime: R,
87-
policy: RetryPolicy,
88-
operation_name: &str,
89-
operation: F,
90-
) -> Result<T, E>
91-
where
92-
R: Runtime,
93-
F: FnMut() -> Fut,
94-
E: std::fmt::Debug,
95-
Fut: Future<Output = Result<T, E>>,
96-
{
97-
// Use a simple classifier that treats all errors as retryable
98-
let simple_classifier = |_: &E| RetryErrorType::Retryable;
99-
100-
retry_with_exponential_backoff_classified(
101-
runtime,
102-
policy,
103-
simple_classifier,
104-
operation_name,
105-
operation,
106-
)
107-
.await
108-
}
109-
110-
/// Enhanced retry with exponential backoff, jitter, and error classification.
72+
/// Retries the given operation with exponential backoff, jitter, and error classification.
11173
///
11274
/// This function provides sophisticated retry behavior by classifying errors
11375
/// and honoring server-provided throttling hints (e.g., HTTP Retry-After, gRPC RetryInfo).
@@ -125,7 +87,7 @@ where
12587
/// A `Result` containing the operation's result or an error if max retries are reached
12688
/// or a non-retryable error occurs.
12789
#[cfg(feature = "experimental_async_runtime")]
128-
pub async fn retry_with_exponential_backoff_classified<R, F, Fut, T, E, C>(
90+
pub async fn retry_with_backoff<R, F, Fut, T, E, C>(
12991
runtime: R,
13092
policy: RetryPolicy,
13193
error_classifier: C,
@@ -186,9 +148,10 @@ where
186148
/// No-op retry function for when experimental_async_runtime is not enabled.
187149
/// This function will execute the operation exactly once without any retries.
188150
#[cfg(not(feature = "experimental_async_runtime"))]
189-
pub async fn retry_with_exponential_backoff<R, F, Fut, T, E>(
151+
pub async fn retry_with_backoff<R, F, Fut, T, E, C>(
190152
_runtime: R,
191153
_policy: RetryPolicy,
154+
_error_classifier: C,
192155
_operation_name: &str,
193156
mut operation: F,
194157
) -> Result<T, E>
@@ -227,9 +190,13 @@ mod tests {
227190
jitter_ms: 100,
228191
};
229192

230-
let result = retry_with_exponential_backoff(runtime, policy, "test_operation", || {
231-
Box::pin(async { Ok::<_, ()>("success") })
232-
})
193+
let result = retry_with_backoff(
194+
runtime,
195+
policy,
196+
|_: &()| RetryErrorType::Retryable,
197+
"test_operation",
198+
|| Box::pin(async { Ok::<_, ()>("success") }),
199+
)
233200
.await;
234201

235202
assert_eq!(result, Ok("success"));
@@ -248,16 +215,22 @@ mod tests {
248215

249216
let attempts = AtomicUsize::new(0);
250217

251-
let result = retry_with_exponential_backoff(runtime, policy, "test_operation", || {
252-
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
253-
Box::pin(async move {
254-
if attempt < 2 {
255-
Err::<&str, &str>("error") // Fail the first two attempts
256-
} else {
257-
Ok::<&str, &str>("success") // Succeed on the third attempt
258-
}
259-
})
260-
})
218+
let result = retry_with_backoff(
219+
runtime,
220+
policy,
221+
|_: &&str| RetryErrorType::Retryable,
222+
"test_operation",
223+
|| {
224+
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
225+
Box::pin(async move {
226+
if attempt < 2 {
227+
Err::<&str, &str>("error") // Fail the first two attempts
228+
} else {
229+
Ok::<&str, &str>("success") // Succeed on the third attempt
230+
}
231+
})
232+
},
233+
)
261234
.await;
262235

263236
assert_eq!(result, Ok("success"));
@@ -277,10 +250,16 @@ mod tests {
277250

278251
let attempts = AtomicUsize::new(0);
279252

280-
let result = retry_with_exponential_backoff(runtime, policy, "test_operation", || {
281-
attempts.fetch_add(1, Ordering::SeqCst);
282-
Box::pin(async { Err::<(), _>("error") }) // Always fail
283-
})
253+
let result = retry_with_backoff(
254+
runtime,
255+
policy,
256+
|_: &&str| RetryErrorType::Retryable,
257+
"test_operation",
258+
|| {
259+
attempts.fetch_add(1, Ordering::SeqCst);
260+
Box::pin(async { Err::<(), _>("error") }) // Always fail
261+
},
262+
)
284263
.await;
285264

286265
assert_eq!(result, Err("error"));
@@ -300,9 +279,15 @@ mod tests {
300279

301280
let result = timeout(
302281
Duration::from_secs(1),
303-
retry_with_exponential_backoff(runtime, policy, "test_operation", || {
304-
Box::pin(async { Err::<(), _>("error") }) // Always fail
305-
}),
282+
retry_with_backoff(
283+
runtime,
284+
policy,
285+
|_: &&str| RetryErrorType::Retryable,
286+
"test_operation",
287+
|| {
288+
Box::pin(async { Err::<(), _>("error") }) // Always fail
289+
},
290+
),
306291
)
307292
.await;
308293

@@ -337,16 +322,10 @@ mod tests {
337322
// Classifier that returns non-retryable
338323
let classifier = |_: &()| RetryErrorType::NonRetryable;
339324

340-
let result = retry_with_exponential_backoff_classified(
341-
runtime,
342-
policy,
343-
classifier,
344-
"test_operation",
345-
|| {
346-
attempts.fetch_add(1, Ordering::SeqCst);
347-
Box::pin(async { Err::<(), _>(()) }) // Always fail
348-
},
349-
)
325+
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
326+
attempts.fetch_add(1, Ordering::SeqCst);
327+
Box::pin(async { Err::<(), _>(()) }) // Always fail
328+
})
350329
.await;
351330

352331
assert!(result.is_err());
@@ -368,22 +347,16 @@ mod tests {
368347
// Classifier that returns retryable
369348
let classifier = |_: &()| RetryErrorType::Retryable;
370349

371-
let result = retry_with_exponential_backoff_classified(
372-
runtime,
373-
policy,
374-
classifier,
375-
"test_operation",
376-
|| {
377-
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
378-
Box::pin(async move {
379-
if attempt < 1 {
380-
Err::<&str, ()>(()) // Fail first attempt
381-
} else {
382-
Ok("success") // Succeed on retry
383-
}
384-
})
385-
},
386-
)
350+
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
351+
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
352+
Box::pin(async move {
353+
if attempt < 1 {
354+
Err::<&str, ()>(()) // Fail first attempt
355+
} else {
356+
Ok("success") // Succeed on retry
357+
}
358+
})
359+
})
387360
.await;
388361

389362
assert_eq!(result, Ok("success"));
@@ -407,22 +380,16 @@ mod tests {
407380

408381
let start_time = std::time::Instant::now();
409382

410-
let result = retry_with_exponential_backoff_classified(
411-
runtime,
412-
policy,
413-
classifier,
414-
"test_operation",
415-
|| {
416-
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
417-
Box::pin(async move {
418-
if attempt < 1 {
419-
Err::<&str, ()>(()) // Fail first attempt (will be throttled)
420-
} else {
421-
Ok("success") // Succeed on retry
422-
}
423-
})
424-
},
425-
)
383+
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
384+
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
385+
Box::pin(async move {
386+
if attempt < 1 {
387+
Err::<&str, ()>(()) // Fail first attempt (will be throttled)
388+
} else {
389+
Ok("success") // Succeed on retry
390+
}
391+
})
392+
})
426393
.await;
427394

428395
let elapsed = start_time.elapsed();
@@ -447,16 +414,10 @@ mod tests {
447414
// Classifier that returns retryable
448415
let classifier = |_: &()| RetryErrorType::Retryable;
449416

450-
let result = retry_with_exponential_backoff_classified(
451-
runtime,
452-
policy,
453-
classifier,
454-
"test_operation",
455-
|| {
456-
attempts.fetch_add(1, Ordering::SeqCst);
457-
Box::pin(async { Err::<(), _>(()) }) // Always fail
458-
},
459-
)
417+
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
418+
attempts.fetch_add(1, Ordering::SeqCst);
419+
Box::pin(async { Err::<(), _>(()) }) // Always fail
420+
})
460421
.await;
461422

462423
assert!(result.is_err());
@@ -482,22 +443,16 @@ mod tests {
482443
_ => RetryErrorType::Retryable,
483444
};
484445

485-
let result = retry_with_exponential_backoff_classified(
486-
runtime,
487-
policy,
488-
classifier,
489-
"test_operation",
490-
|| {
491-
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
492-
Box::pin(async move {
493-
if attempt < 2 {
494-
Err(attempt) // Return attempt number as error
495-
} else {
496-
Ok("success") // Succeed on third attempt
497-
}
498-
})
499-
},
500-
)
446+
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
447+
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
448+
Box::pin(async move {
449+
if attempt < 2 {
450+
Err(attempt) // Return attempt number as error
451+
} else {
452+
Ok("success") // Succeed on third attempt
453+
}
454+
})
455+
})
501456
.await;
502457

503458
assert_eq!(result, Ok("success"));

0 commit comments

Comments
 (0)