|
1 | | -use backon::ExponentialBuilder; |
2 | | -use backon::Retryable; |
3 | 1 | use std::future::Future; |
4 | 2 | use std::time::Duration; |
5 | 3 |
|
@@ -29,28 +27,58 @@ impl<E> RetryError<E> { |
29 | 27 |
|
30 | 28 | impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {} |
31 | 29 |
|
32 | | -/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function |
33 | | -/// Runs with `jitter: false`. |
34 | 30 | pub async fn retry_function<FutureFn, Fut, T, E>( |
35 | | - function: FutureFn, |
36 | | - min_delay: u64, |
| 31 | + mut function: FutureFn, |
| 32 | + min_delay_ms: u64, |
37 | 33 | factor: f32, |
38 | 34 | max_times: usize, |
39 | | - max_delay: u64, |
| 35 | + max_delay_seconds: u64, |
40 | 36 | ) -> Result<T, RetryError<E>> |
41 | 37 | where |
42 | 38 | Fut: Future<Output = Result<T, RetryError<E>>>, |
43 | 39 | FutureFn: FnMut() -> Fut, |
44 | 40 | { |
45 | | - let backoff = ExponentialBuilder::default() |
46 | | - .with_min_delay(Duration::from_millis(min_delay)) |
47 | | - .with_max_times(max_times) |
48 | | - .with_factor(factor) |
49 | | - .with_max_delay(Duration::from_secs(max_delay)); |
50 | | - |
51 | | - function |
52 | | - .retry(backoff) |
53 | | - .sleep(tokio::time::sleep) |
54 | | - .when(|e| matches!(e, RetryError::Transient(_))) |
55 | | - .await |
| 41 | + let mut delay = Duration::from_millis(min_delay_ms); |
| 42 | + |
| 43 | + let factor = (factor as f64).max(1.0); |
| 44 | + |
| 45 | + let mut attempt: usize = 0; |
| 46 | + |
| 47 | + loop { |
| 48 | + match function().await { |
| 49 | + Ok(v) => return Ok(v), |
| 50 | + Err(RetryError::Permanent(e)) => return Err(RetryError::Permanent(e)), |
| 51 | + Err(RetryError::Transient(e)) => { |
| 52 | + if attempt >= max_times { |
| 53 | + return Err(RetryError::Transient(e)); |
| 54 | + } |
| 55 | + |
| 56 | + tokio::time::sleep(delay).await; |
| 57 | + |
| 58 | + delay = next_backoff_delay(delay, max_delay_seconds, factor); |
| 59 | + |
| 60 | + attempt += 1; |
| 61 | + } |
| 62 | + } |
| 63 | + } |
| 64 | +} |
| 65 | + |
| 66 | +/// TODO: Replace with the one in aggregation_mode/db/src/orchestrator.rs, or use a common method. |
| 67 | +fn next_backoff_delay(current_delay: Duration, max_delay_seconds: u64, factor: f64) -> Duration { |
| 68 | + let max: Duration = Duration::from_secs(max_delay_seconds); |
| 69 | + // Defensive: factor should be >= 1.0 for backoff, we clamp it to avoid shrinking/NaN. |
| 70 | + |
| 71 | + let scaled_secs = current_delay.as_secs_f64() * factor; |
| 72 | + let scaled_secs = if scaled_secs.is_finite() { |
| 73 | + scaled_secs |
| 74 | + } else { |
| 75 | + max.as_secs_f64() |
| 76 | + }; |
| 77 | + |
| 78 | + let scaled = Duration::from_secs_f64(scaled_secs); |
| 79 | + if scaled > max { |
| 80 | + max |
| 81 | + } else { |
| 82 | + scaled |
| 83 | + } |
56 | 84 | } |
0 commit comments