Skip to content

Commit 447be8e

Browse files
downloader: Implement retries
1 parent ad41c23 commit 447be8e

File tree

1 file changed

+31
-18
lines changed

1 file changed

+31
-18
lines changed

src/downloader.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use tokio::sync::{mpsc, oneshot, watch, Semaphore};
1010
use crate::Error;
1111

1212
const QUEUE_SIZE: usize = 100;
13+
const MAX_RETRIES: usize = 3;
1314

1415
#[derive(Debug)]
1516
struct DownloadRequest {
@@ -18,6 +19,7 @@ struct DownloadRequest {
1819
result: oneshot::Sender<Result<File, Error>>,
1920
status: watch::Sender<Status>,
2021
cancel: oneshot::Receiver<()>,
22+
remaining_retries: usize,
2123
}
2224

2325
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -55,6 +57,7 @@ pub enum Status {
5557
Pending,
5658
InProgress(DownloadProgress),
5759
Completed,
60+
Retrying,
5861
Cancelled,
5962
Failed,
6063
}
@@ -109,6 +112,7 @@ impl DownloadManager {
109112
result: result_tx,
110113
status: status_tx,
111114
cancel: cancel_rx,
115+
remaining_retries: MAX_RETRIES,
112116
};
113117

114118
let _ = self.queue.try_send(req);
@@ -126,7 +130,7 @@ async fn dispatcher_thread(
126130
mut rx: mpsc::Receiver<DownloadRequest>,
127131
sem: Arc<Semaphore>,
128132
) {
129-
while let Some(request) = rx.recv().await {
133+
while let Some(mut request) = rx.recv().await {
130134
let permit = match sem.clone().acquire_owned().await {
131135
Ok(permit) => permit,
132136
Err(_) => break,
@@ -135,24 +139,33 @@ async fn dispatcher_thread(
135139
tokio::spawn(async move {
136140
// Move the permit into the worker thread so it's automatically released when the thread finishes
137141
let _permit = permit;
138-
match download_thread(client, &request).await {
139-
Ok(file) => {
140-
let _ = request.status.send(Status::Completed);
141-
let _ = request.result.send(Ok(file));
142-
}
143-
Err(e) => {
144-
let status = match e {
145-
Error::Io(ref io_err) => {
146-
if io_err.kind() == std::io::ErrorKind::Interrupted {
147-
Status::Cancelled
148-
} else {
149-
Status::Failed
150-
}
142+
loop {
143+
match download_thread(client.clone(), &request).await {
144+
Ok(file) => {
145+
let _ = request.status.send(Status::Completed);
146+
let _ = request.result.send(Ok(file));
147+
break;
148+
}
149+
Err(e) => {
150+
if request.remaining_retries > 0 {
151+
let _ = request.status.send(Status::Retrying);
152+
request.remaining_retries -= 1;
153+
} else {
154+
let status = match e {
155+
Error::Io(ref io_err) => {
156+
if io_err.kind() == std::io::ErrorKind::Interrupted {
157+
Status::Cancelled
158+
} else {
159+
Status::Failed
160+
}
161+
}
162+
_ => Status::Failed,
163+
};
164+
let _ = request.status.send(status);
165+
let _ = request.result.send(Err(e));
166+
break;
151167
}
152-
_ => Status::Failed,
153-
};
154-
let _ = request.status.send(status);
155-
let _ = request.result.send(Err(e));
168+
}
156169
}
157170
}
158171
});

0 commit comments

Comments
 (0)