Skip to content

Commit 3fe1e92

Browse files
downloader: Refactor retry logic and improve errors
1 parent 926d2fc commit 3fe1e92

File tree

2 files changed

+82
-41
lines changed

2 files changed

+82
-41
lines changed

src/downloader.rs

Lines changed: 72 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1+
use crate::{error::DownloadError, Error};
12
use reqwest::{Client, Url};
2-
use std::{path::PathBuf, sync::Arc};
3+
use std::{path::PathBuf, sync::Arc, time::Duration};
34
use tokio::{
45
fs::File,
56
io::AsyncWriteExt,
67
sync::{mpsc, oneshot, watch, Semaphore},
78
};
89

9-
use crate::Error;
10-
1110
const QUEUE_SIZE: usize = 100;
1211
const MAX_RETRIES: usize = 3;
1312

@@ -18,7 +17,6 @@ struct DownloadRequest {
1817
result: oneshot::Sender<Result<File, Error>>,
1918
status: watch::Sender<Status>,
2019
cancel: oneshot::Receiver<()>,
21-
remaining_retries: usize,
2220
}
2321

2422
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -67,7 +65,6 @@ pub enum Status {
6765
InProgress(DownloadProgress),
6866
Completed,
6967
Retrying,
70-
Cancelled,
7168
Failed,
7269
}
7370

@@ -121,7 +118,6 @@ impl DownloadManager {
121118
result: result_tx,
122119
status: status_tx,
123120
cancel: cancel_rx,
124-
remaining_retries: MAX_RETRIES,
125121
};
126122

127123
let _ = self.queue.try_send(req);
@@ -139,7 +135,7 @@ async fn dispatcher_thread(
139135
mut rx: mpsc::Receiver<DownloadRequest>,
140136
sem: Arc<Semaphore>,
141137
) {
142-
while let Some(mut request) = rx.recv().await {
138+
while let Some(request) = rx.recv().await {
143139
let permit = match sem.clone().acquire_owned().await {
144140
Ok(permit) => permit,
145141
Err(_) => break,
@@ -148,40 +144,79 @@ async fn dispatcher_thread(
148144
tokio::spawn(async move {
149145
// Move the permit into the worker thread so it's automatically released when the thread finishes
150146
let _permit = permit;
151-
loop {
152-
match download_thread(client.clone(), &mut request).await {
153-
Ok(file) => {
154-
let _ = request.status.send(Status::Completed);
155-
let _ = request.result.send(Ok(file));
156-
break;
157-
}
158-
Err(e) => {
159-
if request.remaining_retries > 0 {
160-
let _ = request.status.send(Status::Retrying);
161-
request.remaining_retries -= 1;
162-
} else {
163-
let status = match e {
164-
Error::Io(ref io_err) => {
165-
if io_err.kind() == std::io::ErrorKind::Interrupted {
166-
Status::Cancelled
167-
} else {
168-
Status::Failed
169-
}
170-
}
171-
_ => Status::Failed,
172-
};
173-
let _ = request.status.send(status);
174-
let _ = request.result.send(Err(e));
175-
break;
176-
}
177-
}
147+
download_thread(client.clone(), request).await;
148+
});
149+
}
150+
}
151+
152+
async fn download_thread(client: Client, mut req: DownloadRequest) {
153+
fn should_retry(e: &Error) -> bool {
154+
match e {
155+
Error::Reqwest(network_err) => {
156+
network_err.is_timeout()
157+
|| network_err.is_connect()
158+
|| network_err.is_request()
159+
|| network_err
160+
.status()
161+
.map(|status_code| status_code.is_server_error())
162+
.unwrap_or(true)
163+
}
164+
Error::Download(DownloadError::Cancelled) | Error::Io(_) => false,
165+
_ => false,
166+
}
167+
}
168+
169+
let mut last_error = None;
170+
for attempt in 0..=(MAX_RETRIES) {
171+
if attempt > MAX_RETRIES {
172+
req.status.send(Status::Failed).ok();
173+
req.result
174+
.send(Err(Error::Download(DownloadError::RetriesExhausted {
175+
last_error_msg: last_error
176+
.as_ref()
177+
.map(|e: &crate::Error| e.to_string())
178+
.unwrap_or_else(|| "Unknown Error".to_string()),
179+
})))
180+
.ok();
181+
return;
182+
}
183+
184+
if attempt > 0 {
185+
req.status.send(Status::Retrying).ok();
186+
// Basic exponential backoff
187+
let delay_ms = 1000 * 2u64.pow(attempt as u32 - 1);
188+
let delay = Duration::from_millis(delay_ms);
189+
190+
tokio::select! {
191+
_ = tokio::time::sleep(delay) => {},
192+
_ = &mut req.cancel => {
193+
req.status.send(Status::Failed).ok();
194+
req.result.send(Err(Error::Download(DownloadError::Cancelled))).ok();
195+
return;
178196
}
179197
}
180-
});
198+
}
199+
200+
match download(client.clone(), &mut req).await {
201+
Ok(file) => {
202+
req.status.send(Status::Completed).ok();
203+
req.result.send(Ok(file)).ok();
204+
return;
205+
}
206+
Err(e) => {
207+
if should_retry(&e) {
208+
last_error = Some(e);
209+
continue;
210+
}
211+
req.status.send(Status::Failed).ok();
212+
req.result.send(Err(e)).ok();
213+
return;
214+
}
215+
}
181216
}
182217
}
183218

184-
async fn download_thread(client: Client, req: &mut DownloadRequest) -> Result<File, Error> {
219+
async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Error> {
185220
let update_progress = |bytes_downloaded: u64, total_bytes: Option<u64>| {
186221
req.status
187222
.send(Status::InProgress(DownloadProgress {
@@ -211,10 +246,7 @@ async fn download_thread(client: Client, req: &mut DownloadRequest) -> Result<Fi
211246
_ = &mut req.cancel => {
212247
drop(file); // Manually drop the file handle to ensure that deletion doesn't fail
213248
tokio::fs::remove_file(&req.destination).await?;
214-
return Err(Error::Io(std::io::Error::new(
215-
std::io::ErrorKind::Interrupted,
216-
"Download cancelled",
217-
)));
249+
return Err(Error::Download(DownloadError::Cancelled));
218250
}
219251
chunk = response.chunk() => {
220252
match chunk {
@@ -230,7 +262,6 @@ async fn download_thread(client: Client, req: &mut DownloadRequest) -> Result<Fi
230262
return Err(Error::Reqwest(e))
231263
},
232264
}
233-
234265
}
235266
}
236267
}

src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,14 @@ pub enum Error {
1010
Reqwest(#[from] reqwest::Error),
1111
#[error("Oneshot: {0}")]
1212
Oneshot(#[from] tokio::sync::oneshot::error::RecvError),
13+
#[error("Download: {0}")]
14+
Download(#[from] DownloadError),
15+
}
16+
17+
#[derive(Error, Debug, Clone)]
18+
pub enum DownloadError {
19+
#[error("Download was cancelled")]
20+
Cancelled,
21+
#[error("Retry limit exceeded")]
22+
RetriesExhausted { last_error_msg: String },
1323
}

0 commit comments

Comments
 (0)