Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl SourceUpdateTask {
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
retry_timeout: std::time::Duration::from_secs(365 * 24 * 60 * 60),
retry_timeout: None,
initial_backoff: std::time::Duration::from_secs(5),
max_backoff: std::time::Duration::from_secs(60),
};
Expand Down
32 changes: 21 additions & 11 deletions src/utils/retryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ pub fn Ok<T>(value: T) -> Result<T> {
}

pub struct RetryOptions {
pub retry_timeout: Duration,
pub retry_timeout: Option<Duration>,
pub initial_backoff: Duration,
pub max_backoff: Duration,
}

impl Default for RetryOptions {
fn default() -> Self {
Self {
retry_timeout: DEFAULT_RETRY_TIMEOUT,
retry_timeout: Some(DEFAULT_RETRY_TIMEOUT),
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
}
}
}

pub static HEAVY_LOADED_OPTIONS: RetryOptions = RetryOptions {
retry_timeout: DEFAULT_RETRY_TIMEOUT,
retry_timeout: Some(DEFAULT_RETRY_TIMEOUT),
initial_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(60),
};
Expand All @@ -110,8 +110,9 @@ pub async fn run<
f: F,
options: &RetryOptions,
) -> Result<Ok, Err> {
let start_time = Instant::now();
let deadline = start_time + options.retry_timeout;
let deadline = options
.retry_timeout
.map(|timeout| Instant::now() + timeout);
let mut backoff = options.initial_backoff;

loop {
Expand All @@ -121,13 +122,22 @@ pub async fn run<
if !err.is_retryable() {
return Result::Err(err);
}
let now = Instant::now();
if now >= deadline {
return Result::Err(err);
let mut sleep_duration = backoff;
if let Some(deadline) = deadline {
let now = Instant::now();
if now >= deadline {
return Result::Err(err);
}
let remaining_time = deadline.saturating_duration_since(now);
if remaining_time < sleep_duration {
sleep_duration = remaining_time;
}
}
trace!("Will retry in {}ms for error: {}", backoff.as_millis(), err);
let remaining_time = deadline.saturating_duration_since(now);
let sleep_duration = std::cmp::min(backoff, remaining_time);
trace!(
"Will retry in {}ms for error: {}",
sleep_duration.as_millis(),
err
);
tokio::time::sleep(sleep_duration).await;
if backoff < options.max_backoff {
backoff = std::cmp::min(
Expand Down