Skip to content

Commit 4655918

Browse files
authored
Refactor retry (#458)
* Refactor retry * Forgot to run cargo fmt
1 parent 9370fce commit 4655918

File tree

4 files changed

+69
-94
lines changed

4 files changed

+69
-94
lines changed
Lines changed: 12 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use crate::policies::{Policy, PolicyResult, Request, Response};
2-
use crate::sleep::sleep;
3-
use crate::PipelineContext;
41
use chrono::{DateTime, Local};
5-
use std::sync::Arc;
62
use std::time::Duration;
73

84
/// Retry policy with exponential back-off.
@@ -27,56 +23,24 @@ impl ExponentialRetryPolicy {
2723
max_delay,
2824
}
2925
}
26+
}
3027

31-
fn is_expired(
32-
&self,
33-
first_retry_time: &mut Option<DateTime<Local>>,
34-
current_retries: &u32,
35-
) -> bool {
36-
if *current_retries > self.max_retries {
28+
impl super::RetryPolicy for ExponentialRetryPolicy {
29+
fn is_expired(&self, first_retry_time: &mut Option<DateTime<Local>>, retry_count: u32) -> bool {
30+
if retry_count > self.max_retries {
3731
return true;
3832
}
3933

40-
if first_retry_time.is_none() {
41-
*first_retry_time = Some(Local::now());
42-
}
34+
let first_retry_time = first_retry_time.get_or_insert_with(|| Local::now());
35+
let max_delay = chrono::Duration::from_std(self.max_delay)
36+
.unwrap_or_else(|_| chrono::Duration::max_value());
4337

44-
Local::now()
45-
> first_retry_time.unwrap() + chrono::Duration::from_std(self.max_delay).unwrap()
38+
Local::now() > *first_retry_time + max_delay
4639
}
47-
}
4840

49-
#[async_trait::async_trait]
50-
impl<C> Policy<C> for ExponentialRetryPolicy
51-
where
52-
C: Send + Sync,
53-
{
54-
async fn send(
55-
&self,
56-
ctx: &mut PipelineContext<C>,
57-
request: &mut Request,
58-
next: &[Arc<dyn Policy<C>>],
59-
) -> PolicyResult<Response> {
60-
let mut first_retry_time = None;
61-
let mut current_retries = 0;
62-
63-
loop {
64-
match next[0].send(ctx, request, &next[1..]).await {
65-
Ok(response) => return Ok(response),
66-
Err(error) => {
67-
log::error!("Error occurred when making request: {}", error);
68-
if self.is_expired(&mut first_retry_time, &current_retries) {
69-
return Err(error);
70-
} else {
71-
current_retries += 1;
72-
73-
let sleep_ms = self.delay.as_millis() as u64
74-
* u64::pow(2u64, current_retries - 1)
75-
+ rand::random::<u8>() as u64;
76-
sleep(Duration::from_millis(sleep_ms)).await;
77-
}
78-
}
79-
}
80-
}
41+
fn sleep_duration(&self, retry_count: u32) -> Duration {
42+
let sleep_ms = self.delay.as_millis() as u64 * u64::pow(2u64, retry_count - 1)
43+
+ rand::random::<u8>() as u64;
44+
Duration::from_millis(sleep_ms)
8145
}
8246
}
Lines changed: 11 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use crate::policies::{Policy, PolicyResult, Request, Response};
2-
use crate::sleep::sleep;
3-
use crate::PipelineContext;
41
use chrono::{DateTime, Local};
5-
use std::sync::Arc;
62
use std::time::Duration;
73

84
/// Retry policy with fixed back-off.
@@ -26,54 +22,23 @@ impl FixedRetryPolicy {
2622
max_delay,
2723
}
2824
}
25+
}
2926

30-
fn is_expired(
31-
&self,
32-
first_retry_time: &mut Option<DateTime<Local>>,
33-
current_retries: &u32,
34-
) -> bool {
35-
if *current_retries > self.max_retries {
27+
impl super::RetryPolicy for FixedRetryPolicy {
28+
fn is_expired(&self, first_retry_time: &mut Option<DateTime<Local>>, retry_count: u32) -> bool {
29+
if retry_count > self.max_retries {
3630
return true;
3731
}
3832

39-
if first_retry_time.is_none() {
40-
*first_retry_time = Some(Local::now());
41-
}
33+
let first_retry_time = first_retry_time.get_or_insert_with(Local::now);
34+
let max_delay = chrono::Duration::from_std(self.max_delay)
35+
.unwrap_or_else(|_| chrono::Duration::max_value());
4236

43-
Local::now()
44-
> first_retry_time.unwrap() + chrono::Duration::from_std(self.max_delay).unwrap()
37+
Local::now() > *first_retry_time + max_delay
4538
}
46-
}
4739

48-
#[async_trait::async_trait]
49-
impl<C> Policy<C> for FixedRetryPolicy
50-
where
51-
C: Send + Sync,
52-
{
53-
async fn send(
54-
&self,
55-
ctx: &mut PipelineContext<C>,
56-
request: &mut Request,
57-
next: &[Arc<dyn Policy<C>>],
58-
) -> PolicyResult<Response> {
59-
let mut first_retry_time = None;
60-
let mut current_retries = 0;
61-
62-
loop {
63-
match next[0].send(ctx, request, &next[1..]).await {
64-
Ok(response) => return Ok(response),
65-
Err(error) => {
66-
log::error!("Error occurred when making request: {}", error);
67-
if self.is_expired(&mut first_retry_time, &current_retries) {
68-
return Err(error);
69-
} else {
70-
current_retries += 1;
71-
72-
let sleep_ms = self.delay.as_millis() as u64 + rand::random::<u8>() as u64;
73-
sleep(Duration::from_millis(sleep_ms)).await;
74-
}
75-
}
76-
}
77-
}
40+
fn sleep_duration(&self, _retry_count: u32) -> Duration {
41+
let sleep_ms = self.delay.as_millis() as u64 + rand::random::<u8>() as u64;
42+
Duration::from_millis(sleep_ms)
7843
}
7944
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod exponential_retry;
22
mod fixed_retry;
33
mod no_retry;
4+
mod retry_policy;
45

56
pub use exponential_retry::*;
67
pub use fixed_retry::*;
78
pub use no_retry::*;
9+
use retry_policy::RetryPolicy;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use crate::policies::{Policy, PolicyResult, Request, Response};
2+
use crate::sleep::sleep;
3+
use crate::PipelineContext;
4+
use chrono::{DateTime, Local};
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
8+
pub trait RetryPolicy {
9+
fn is_expired(&self, first_retry_time: &mut Option<DateTime<Local>>, retry_count: u32) -> bool;
10+
fn sleep_duration(&self, retry_count: u32) -> Duration;
11+
}
12+
13+
#[async_trait::async_trait]
14+
impl<T, C> Policy<C> for T
15+
where
16+
T: RetryPolicy + std::fmt::Debug + Send + Sync,
17+
C: Send + Sync,
18+
{
19+
async fn send(
20+
&self,
21+
ctx: &mut PipelineContext<C>,
22+
request: &mut Request,
23+
next: &[Arc<dyn Policy<C>>],
24+
) -> PolicyResult<Response> {
25+
let mut first_retry_time = None;
26+
let mut retry_count = 0;
27+
28+
loop {
29+
match next[0].send(ctx, request, &next[1..]).await {
30+
Ok(response) => return Ok(response),
31+
Err(error) => {
32+
log::error!("Error occurred when making request: {}", error);
33+
if self.is_expired(&mut first_retry_time, retry_count) {
34+
return Err(error);
35+
} else {
36+
retry_count += 1;
37+
38+
sleep(self.sleep_duration(retry_count)).await;
39+
}
40+
}
41+
}
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)