Skip to content

Commit 92cb6c6

Browse files
downloader: Propagate errors and correctly update download status
1 parent c5479b8 commit 92cb6c6

File tree

1 file changed

+44
-32
lines changed

1 file changed

+44
-32
lines changed

src/downloader.rs

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,38 @@
11
use reqwest::{Client, Url};
2-
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
3-
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
2+
use std::{
3+
fs::File,
4+
io::{Seek, SeekFrom, Write},
5+
path::PathBuf,
6+
sync::Arc,
7+
};
8+
use tokio::sync::{mpsc, oneshot, watch, Semaphore};
9+
10+
use crate::Error;
411

512
const QUEUE_SIZE: usize = 100;
613

714
#[derive(Debug)]
815
struct DownloadRequest {
916
url: Url,
1017
destination: PathBuf,
11-
result: oneshot::Sender<Result<File, reqwest::Error>>,
18+
result: oneshot::Sender<Result<File, Error>>,
1219
status: watch::Sender<Status>,
13-
progress: broadcast::Sender<DownloadProgress>,
1420
}
1521

16-
#[derive(Debug, Clone, Copy)]
22+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1723
pub struct DownloadProgress {
1824
pub bytes_downloaded: u64,
1925
pub total_bytes: Option<u64>,
2026
}
2127

2228
#[derive(Debug)]
2329
pub struct DownloadHandle {
24-
result: oneshot::Receiver<Result<File, reqwest::Error>>,
30+
result: oneshot::Receiver<Result<File, Error>>,
2531
status: watch::Receiver<Status>,
26-
progress: broadcast::Receiver<DownloadProgress>,
2732
}
2833

2934
impl DownloadHandle {
30-
pub async fn r#await(self) -> Result<std::fs::File, reqwest::Error> {
35+
pub async fn r#await(self) -> Result<std::fs::File, Error> {
3136
match self.result.await {
3237
Ok(result) => result,
3338
Err(_) => todo!(),
@@ -37,16 +42,12 @@ impl DownloadHandle {
3742
pub fn status(&self) -> Status {
3843
self.status.borrow().clone()
3944
}
40-
41-
pub fn subscribe_progress(&self) -> &broadcast::Receiver<DownloadProgress> {
42-
&self.progress
43-
}
4445
}
4546

4647
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
4748
pub enum Status {
4849
Pending,
49-
InProgress,
50+
InProgress(DownloadProgress),
5051
Completed,
5152
Failed,
5253
}
@@ -93,22 +94,19 @@ impl DownloadManager {
9394
pub fn add_request(&self, url: Url, destination: PathBuf) -> DownloadHandle {
9495
let (result_tx, result_rx) = oneshot::channel();
9596
let (status_tx, status_rx) = watch::channel(Status::Pending);
96-
let (progress_tx, progress_rx) = broadcast::channel(16);
9797

9898
let req = DownloadRequest {
9999
url,
100100
destination,
101101
result: result_tx,
102102
status: status_tx,
103-
progress: progress_tx,
104103
};
105104

106105
let _ = self.queue.try_send(req);
107106

108107
DownloadHandle {
109108
result: result_rx,
110109
status: status_rx,
111-
progress: progress_rx,
112110
}
113111
}
114112
}
@@ -127,28 +125,42 @@ async fn dispatcher_thread(
127125
tokio::spawn(async move {
128126
// Move the permit into the worker thread so it's automatically released when the thread finishes
129127
let _permit = permit;
130-
let _ = download_thread(client, request).await;
128+
match download_thread(client, &request).await {
129+
Ok(file) => {
130+
let _ = request.status.send(Status::Completed);
131+
let _ = request.result.send(Ok(file));
132+
}
133+
Err(e) => {
134+
let _ = request.status.send(Status::Failed);
135+
let _ = request.result.send(Err(e));
136+
}
137+
}
131138
});
132139
}
133140
}
134141

135-
async fn download_thread(
136-
client: Client,
137-
req: DownloadRequest,
138-
) -> Result<(), Box<dyn std::error::Error>> {
139-
let mut resp = client.get(req.url).send().await?;
140-
let total = resp.content_length();
141-
let mut file = File::create(&req.destination)?;
142-
// let mut stream = resp.bytes().await?;
143-
let mut downloaded = 0u64;
142+
async fn download_thread(client: Client, req: &DownloadRequest) -> Result<File, Error> {
143+
println!("Download Thread Started for: {}", req.destination.display());
144+
let mut resp = client.get(req.url.as_ref()).send().await?;
145+
let total_bytes = resp.content_length();
146+
let mut bytes_downloaded = 0u64;
147+
let mut file = File::options()
148+
.read(true)
149+
.write(true)
150+
.create(true)
151+
.open(&req.destination)?;
152+
144153
while let Some(chunk) = resp.chunk().await.transpose() {
145154
let chunk = chunk?;
146155
file.write_all(&chunk)?;
147-
downloaded += chunk.len() as u64;
148-
let _ = req.progress.send(DownloadProgress {
149-
bytes_downloaded: downloaded,
150-
total_bytes: total,
151-
});
156+
bytes_downloaded += chunk.len() as u64;
157+
let _ = req.status.send(Status::InProgress(DownloadProgress {
158+
bytes_downloaded,
159+
total_bytes,
160+
}));
152161
}
153-
Ok(())
162+
163+
// Reset the cursor to the beginning of the file
164+
file.seek(SeekFrom::Start(0))?;
165+
Ok(file)
154166
}

0 commit comments

Comments
 (0)