Skip to content

Commit adc6d0a

Browse files
committed
refactor(stage): update retry logic
1 parent de2488d commit adc6d0a

File tree

2 files changed

+66
-15
lines changed

2 files changed

+66
-15
lines changed

crates/sync/stage/src/blocks/downloader.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ pub trait BlockDownloader: Send + Sync {
4040
) -> impl Future<Output = Result<Vec<StateUpdateWithBlock>, katana_gateway::client::Error>> + Send;
4141
}
4242

43+
///////////////////////////////////////////////////////////////////////////////////
44+
// Implementations
45+
///////////////////////////////////////////////////////////////////////////////////
46+
4347
/// An implementation of [`BlockDownloader`] that uses the [`BatchDownloader`] utility.
4448
///
4549
/// This implementation leverages the generic
@@ -91,9 +95,10 @@ where
9195
mod impls {
9296
use std::future::Future;
9397

94-
use katana_gateway::client::Client as GatewayClient;
98+
use katana_gateway::client::{Client as GatewayClient, Error as GatewayClientError};
9599
use katana_gateway::types::StateUpdateWithBlock;
96100
use katana_primitives::block::BlockNumber;
101+
use tracing::error;
97102

98103
use crate::downloader::{Downloader, DownloaderResult};
99104

@@ -121,10 +126,15 @@ mod impls {
121126
key: &Self::Key,
122127
) -> impl Future<Output = DownloaderResult<Self::Value, Self::Error>> {
123128
async {
124-
match self.gateway.get_state_update_with_block((*key).into()).await {
129+
match self.gateway.get_state_update_with_block((*key).into()).await.inspect_err(
130+
|error| error!(block = %*key, ?error, "Error downloading block from gateway."),
131+
) {
125132
Ok(data) => DownloaderResult::Ok(data),
126-
Err(err) if err.is_rate_limited() => DownloaderResult::Retry(err),
127-
Err(err) => DownloaderResult::Err(err),
133+
Err(err) => match err {
134+
GatewayClientError::RateLimited
135+
| GatewayClientError::UnknownFormat { .. } => DownloaderResult::Retry(err),
136+
_ => DownloaderResult::Err(err),
137+
},
128138
}
129139
}
130140
}

crates/sync/stage/src/downloader.rs

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use std::time::Duration;
1313

1414
use anyhow::Result;
1515
use backon::{BackoffBuilder, ExponentialBuilder};
16-
use tracing::trace;
16+
use futures::future::join_all;
17+
use tracing::{trace, warn};
1718

1819
/// A batch downloader that processes multiple download requests with retry logic.
1920
///
@@ -51,9 +52,9 @@ impl<D> BatchDownloader<D> {
5152
///
5253
/// Uses exponential backoff as the default retry strategy with the following parameters:
5354
/// - Minimum delay: 3 seconds
54-
/// - Maximum delay: 60 seconds
55+
/// - Maximum delay: 1 minute
5556
/// - Backoff factor: 2.0 (delays double each retry)
56-
/// - Maximum retry attempts: 3
57+
/// - Maximum retry attempts: no limit
5758
///
5859
/// This means failed downloads will be retried with delays of approximately 3s, 6s, and 12s
5960
/// before giving up (total of 4 attempts including the initial request).
@@ -63,7 +64,11 @@ impl<D> BatchDownloader<D> {
6364
/// * `downloader` - The downloader implementation to use for individual downloads
6465
/// * `batch_size` - Maximum number of items to download concurrently in each batch
6566
pub fn new(downloader: D, batch_size: usize) -> Self {
66-
let backoff = ExponentialBuilder::default().with_min_delay(Duration::from_secs(3));
67+
let backoff = ExponentialBuilder::default()
68+
.with_min_delay(Duration::from_secs(3))
69+
.with_max_delay(Duration::from_secs(60))
70+
.without_max_times();
71+
6772
Self { backoff, downloader, batch_size }
6873
}
6974

@@ -137,22 +142,50 @@ where
137142
/// assert_eq!(values.len(), 15);
138143
/// ```
139144
pub async fn download(&self, keys: Vec<D::Key>) -> Result<Vec<D::Value>, D::Error> {
145+
let total_items = keys.len();
146+
let total_batches = total_items.div_ceil(self.batch_size);
147+
148+
trace!(
149+
total_items = %total_items,
150+
batch_size = %self.batch_size,
151+
total_batches = %total_batches,
152+
"Starting batch download."
153+
);
154+
140155
let mut items = Vec::with_capacity(keys.len());
141156

142-
for chunk in keys.chunks(self.batch_size) {
157+
for (batch_idx, chunk) in keys.chunks(self.batch_size).enumerate() {
158+
let batch_num = batch_idx + 1;
159+
trace!(
160+
batch = %batch_num,
161+
total_batches = %total_batches,
162+
batch_size = %chunk.len(),
163+
"Processing batch."
164+
);
165+
143166
let batch = self.download_batch_with_retry(chunk.to_vec()).await?;
144167
items.extend(batch);
168+
169+
trace!(
170+
batch = %batch_num,
171+
total_batches = %total_batches,
172+
downloaded = %items.len(),
173+
total = %total_items,
174+
"Completed batch."
175+
);
145176
}
146177

178+
trace!(
179+
total_items = %total_items,
180+
total_batches = %total_batches,
181+
"Batch download completed successfully."
182+
);
183+
147184
Ok(items)
148185
}
149186

150187
async fn download_batch(&self, keys: &[D::Key]) -> Vec<DownloaderResult<D::Value, D::Error>> {
151-
let mut requests = Vec::with_capacity(keys.len());
152-
for key in keys {
153-
requests.push(self.downloader.download(key));
154-
}
155-
futures::future::join_all(requests).await
188+
join_all(keys.iter().map(|key| self.downloader.download(key))).await
156189
}
157190

158191
async fn download_batch_with_retry(
@@ -163,6 +196,7 @@ where
163196

164197
let mut remaining_keys = keys.clone();
165198
let mut backoff = self.backoff.clone().build();
199+
let mut retry_attempt = 0;
166200

167201
loop {
168202
let batch_result = self.download_batch(&remaining_keys).await;
@@ -198,8 +232,15 @@ where
198232

199233
// Check if we should retry
200234
if let Some(delay) = backoff.next() {
235+
retry_attempt += 1;
201236
if let Some(ref error) = last_error {
202-
trace!(%error, failed_keys = %failed_keys.len(), "Retrying download for failed keys.");
237+
warn!(
238+
%error,
239+
failed_keys = %failed_keys.len(),
240+
retry_attempt = %retry_attempt,
241+
delay_secs = %delay.as_secs(),
242+
"Retrying downloads."
243+
);
203244
}
204245

205246
tokio::time::sleep(delay).await;

0 commit comments

Comments
 (0)