-
-
Notifications
You must be signed in to change notification settings - Fork 1
Draft: Download manager #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cyberphantom52
wants to merge
31
commits into
main
Choose a base branch
from
download_manager
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 19 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
dbbe919
chore: Add reqwest crate
cyberphantom52 748106a
downloader: Implement DownloadManager
cyberphantom52 c5479b8
error: Add reqwest errors
cyberphantom52 9c29890
downloader: Propagate errors and correctly update download status
cyberphantom52 ad41c23
downloader: Implement cancelling downloads
cyberphantom52 447be8e
downloader: Implement retries
cyberphantom52 395ced4
downloader: Improve File handling and use switch to tokio::fs::File
cyberphantom52 1bf7bfd
downloader: Use tokio::select to check for cancellations
cyberphantom52 9adb239
downloader: Send initial progress update and return error for response
cyberphantom52 b9911b8
downloader: Implement Future trait for DownloadHandle
cyberphantom52 9f25867
downloader: End progress updates at completion too
cyberphantom52 c313200
downloader: Manually drop file handle before deleting on error or cancel
cyberphantom52 926d2fc
fix: gptk impl
edfloreshz 3fe1e92
downloader: Refactor retry logic and improve errors
cyberphantom52 49d4f60
downloader: Add method to wait for status updates
cyberphantom52 7973137
downloader: fix retry logic
cyberphantom52 31f5218
downloader: Use tokio_util::CancellationToken to handle cancellations
cyberphantom52 d96688d
downloader: Add hierarchical cancel tokens to cancel all downloads
cyberphantom52 3a14a0d
downloader: Split the implementation into a module
cyberphantom52 5566d67
downloader: Add better progress updates with rate limiting
cyberphantom52 2aac716
downloader: Better download interface with errors
cyberphantom52 9251524
downloader: Add some convinience methods related to download status
cyberphantom52 0670468
downloader: Add config for download manager
cyberphantom52 ce05710
downloader: Use TaskTracker to track thread lifecycles
cyberphantom52 633b752
downloader: Add shutdown, active_downloads and queued_downloads
cyberphantom52 09b47b6
downloader: Add DownloadConfig and rate limiting
cyberphantom52 4803c51
downloader: Use builder pattern to build download requests
cyberphantom52 9e9b298
downloader: Don't rexport worker_thread
cyberphantom52 987beaa
downloader: Switch to url::Url
cyberphantom52 c550480
downloader: Accept Url's directly instead of parsing them
cyberphantom52 8354cc9
downloader: Add convinience methods to send updates and result
cyberphantom52 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
use super::Status; | ||
use crate::Error; | ||
use tokio::{ | ||
fs::File, | ||
sync::{oneshot, watch}, | ||
}; | ||
use tokio_util::sync::CancellationToken; | ||
|
||
#[derive(Debug)] | ||
pub struct DownloadHandle { | ||
result: oneshot::Receiver<Result<File, Error>>, | ||
status: watch::Receiver<Status>, | ||
cancel: CancellationToken, | ||
} | ||
|
||
impl DownloadHandle { | ||
pub fn new( | ||
result: oneshot::Receiver<Result<File, Error>>, | ||
status: watch::Receiver<Status>, | ||
cancel: CancellationToken, | ||
) -> Self { | ||
Self { | ||
result, | ||
status, | ||
cancel, | ||
} | ||
} | ||
} | ||
|
||
impl std::future::Future for DownloadHandle { | ||
type Output = Result<tokio::fs::File, Error>; | ||
|
||
fn poll( | ||
mut self: std::pin::Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Self::Output> { | ||
use std::pin::Pin; | ||
use std::task::Poll; | ||
match Pin::new(&mut self.result).poll(cx) { | ||
Poll::Ready(Ok(result)) => Poll::Ready(result), | ||
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Oneshot(e))), | ||
Poll::Pending => Poll::Pending, | ||
} | ||
} | ||
} | ||
|
||
impl DownloadHandle { | ||
pub fn status(&self) -> Status { | ||
*self.status.borrow() | ||
} | ||
|
||
pub async fn wait_for_status_update(&mut self) -> Result<(), watch::error::RecvError> { | ||
self.status.changed().await | ||
} | ||
|
||
pub fn cancel(&self) { | ||
self.cancel.cancel(); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
use reqwest::{Client, Url}; | ||
use std::{path::PathBuf, sync::Arc}; | ||
use tokio::sync::{mpsc, oneshot, watch, Semaphore}; | ||
use tokio_util::sync::CancellationToken; | ||
|
||
use super::{download_thread, DownloadHandle, DownloadRequest, Status}; | ||
|
||
const QUEUE_SIZE: usize = 100; | ||
|
||
#[derive(Debug)] | ||
pub struct DownloadManager { | ||
queue: mpsc::Sender<DownloadRequest>, | ||
semaphore: Arc<Semaphore>, | ||
cancel: CancellationToken, | ||
} | ||
|
||
impl Drop for DownloadManager { | ||
fn drop(&mut self) { | ||
// Need to manually close the semaphore to make sure dispatcher_thread stops waiting for permits | ||
self.semaphore.close(); | ||
} | ||
} | ||
|
||
impl DownloadManager { | ||
pub fn new(limit: usize) -> Self { | ||
let (tx, rx) = mpsc::channel(QUEUE_SIZE); | ||
let client = Client::new(); | ||
let semaphore = Arc::new(Semaphore::new(limit)); | ||
let manager = Self { | ||
queue: tx, | ||
semaphore: semaphore.clone(), | ||
cancel: CancellationToken::new(), | ||
}; | ||
// Spawn the dispatcher thread to handle download requests | ||
tokio::spawn(async move { dispatcher_thread(client, rx, semaphore).await }); | ||
manager | ||
} | ||
|
||
pub fn set_max_parallel_downloads(&self, limit: usize) { | ||
let current = self.semaphore.available_permits(); | ||
if limit > current { | ||
self.semaphore.add_permits(limit - current); | ||
} else if limit < current { | ||
let to_remove = current - limit; | ||
for _ in 0..to_remove { | ||
let _ = self.semaphore.try_acquire(); | ||
} | ||
} | ||
} | ||
|
||
pub fn add_request(&self, url: Url, destination: PathBuf) -> DownloadHandle { | ||
let (result_tx, result_rx) = oneshot::channel(); | ||
let (status_tx, status_rx) = watch::channel(Status::Queued); | ||
let cancel = self.cancel.child_token(); | ||
|
||
let req = DownloadRequest { | ||
url, | ||
destination, | ||
result: result_tx, | ||
status: status_tx, | ||
cancel: cancel.clone(), | ||
}; | ||
|
||
let _ = self.queue.try_send(req); | ||
|
||
DownloadHandle::new(result_rx, status_rx, cancel) | ||
} | ||
|
||
pub fn cancel_all(&self) { | ||
self.cancel.cancel(); | ||
} | ||
} | ||
|
||
async fn dispatcher_thread( | ||
client: Client, | ||
mut rx: mpsc::Receiver<DownloadRequest>, | ||
sem: Arc<Semaphore>, | ||
) { | ||
while let Some(request) = rx.recv().await { | ||
let permit = match sem.clone().acquire_owned().await { | ||
Ok(permit) => permit, | ||
Err(_) => break, | ||
}; | ||
let client = client.clone(); | ||
tokio::spawn(async move { | ||
// Move the permit into the worker thread so it's automatically released when the thread finishes | ||
let _permit = permit; | ||
download_thread(client.clone(), request).await; | ||
}); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
mod handle; | ||
mod manager; | ||
mod types; | ||
mod worker; | ||
|
||
pub use handle::DownloadHandle; | ||
pub use manager::*; | ||
pub use types::*; | ||
pub(self) use worker::download_thread; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
use crate::Error; | ||
use reqwest::Url; | ||
use std::path::{Path, PathBuf}; | ||
use tokio::{ | ||
fs::File, | ||
sync::{oneshot, watch}, | ||
}; | ||
use tokio_util::sync::CancellationToken; | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct DownloadRequest { | ||
pub url: Url, | ||
pub destination: PathBuf, | ||
pub result: oneshot::Sender<Result<File, Error>>, | ||
pub status: watch::Sender<Status>, | ||
pub cancel: CancellationToken, | ||
} | ||
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
pub struct DownloadProgress { | ||
pub bytes_downloaded: u64, | ||
pub total_bytes: Option<u64>, | ||
} | ||
|
||
#[derive(Debug, Copy, Clone, PartialEq, Eq)] | ||
pub enum Status { | ||
Queued, | ||
InProgress(DownloadProgress), | ||
Retrying, | ||
Completed, | ||
Failed, | ||
Cancelled, | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
use super::{DownloadProgress, DownloadRequest}; | ||
use crate::{downloader::Status, error::DownloadError, Error}; | ||
use reqwest::Client; | ||
use std::time::Duration; | ||
use tokio::{fs::File, io::AsyncWriteExt}; | ||
|
||
const MAX_RETRIES: usize = 3; | ||
|
||
pub(super) async fn download_thread(client: Client, mut req: DownloadRequest) { | ||
fn should_retry(e: &Error) -> bool { | ||
match e { | ||
Error::Reqwest(network_err) => { | ||
network_err.is_timeout() | ||
|| network_err.is_connect() | ||
|| network_err.is_request() | ||
|| network_err | ||
.status() | ||
.map(|status_code| status_code.is_server_error()) | ||
.unwrap_or(true) | ||
} | ||
Error::Download(DownloadError::Cancelled) | Error::Io(_) => false, | ||
_ => false, | ||
} | ||
} | ||
|
||
let mut last_error = None; | ||
for attempt in 0..=(MAX_RETRIES + 1) { | ||
if attempt > MAX_RETRIES { | ||
req.status.send(Status::Failed).ok(); | ||
req.result | ||
.send(Err(Error::Download(DownloadError::RetriesExhausted { | ||
last_error_msg: last_error | ||
.as_ref() | ||
.map(ToString::to_string) | ||
.unwrap_or_else(|| "Unknown Error".to_string()), | ||
}))) | ||
.ok(); | ||
return; | ||
} | ||
|
||
if attempt > 0 { | ||
req.status.send(Status::Retrying).ok(); | ||
// Basic exponential backoff | ||
let delay_ms = 1000 * 2u64.pow(attempt as u32 - 1); | ||
let delay = Duration::from_millis(delay_ms); | ||
|
||
tokio::select! { | ||
_ = tokio::time::sleep(delay) => {}, | ||
_ = req.cancel.cancelled() => { | ||
req.status.send(Status::Failed).ok(); | ||
req.result.send(Err(Error::Download(DownloadError::Cancelled))).ok(); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
match download(client.clone(), &mut req).await { | ||
Ok(file) => { | ||
req.status.send(Status::Completed).ok(); | ||
req.result.send(Ok(file)).ok(); | ||
return; | ||
} | ||
Err(e) => { | ||
if should_retry(&e) { | ||
last_error = Some(e); | ||
continue; | ||
} | ||
|
||
let status = if matches!(e, Error::Download(DownloadError::Cancelled)) { | ||
Status::Cancelled | ||
} else { | ||
Status::Failed | ||
}; | ||
req.status.send(status).ok(); | ||
req.result.send(Err(e)).ok(); | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
|
||
async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Error> { | ||
let update_progress = |bytes_downloaded: u64, total_bytes: Option<u64>| { | ||
req.status | ||
.send(Status::InProgress(DownloadProgress { | ||
bytes_downloaded, | ||
total_bytes, | ||
})) | ||
.ok(); | ||
}; | ||
|
||
let mut response = client | ||
.get(req.url.as_ref()) | ||
.send() | ||
.await? | ||
.error_for_status()?; | ||
let total_bytes = response.content_length(); | ||
let mut bytes_downloaded = 0u64; | ||
|
||
// Create the destination directory if it doesn't exist | ||
if let Some(parent) = req.destination.parent() { | ||
tokio::fs::create_dir_all(parent).await?; | ||
} | ||
let mut file = File::create(&req.destination).await?; | ||
|
||
update_progress(bytes_downloaded, total_bytes); | ||
loop { | ||
tokio::select! { | ||
_ = req.cancel.cancelled() => { | ||
drop(file); // Manually drop the file handle to ensure that deletion doesn't fail | ||
tokio::fs::remove_file(&req.destination).await?; | ||
return Err(Error::Download(DownloadError::Cancelled)); | ||
} | ||
chunk = response.chunk() => { | ||
match chunk { | ||
Ok(Some(chunk)) => { | ||
file.write_all(&chunk).await?; | ||
bytes_downloaded += chunk.len() as u64; | ||
update_progress(bytes_downloaded, total_bytes); | ||
} | ||
Ok(None) => break, | ||
Err(e) => { | ||
drop(file); // Manually drop the file handle to ensure that deletion doesn't fail | ||
tokio::fs::remove_file(&req.destination).await?; | ||
return Err(Error::Reqwest(e)) | ||
}, | ||
} | ||
} | ||
} | ||
} | ||
update_progress(bytes_downloaded, total_bytes); | ||
|
||
// Ensure the data is written to disk | ||
file.sync_all().await?; | ||
// Open a new file handle with RO permissions | ||
let file = File::options().read(true).open(&req.destination).await?; | ||
Ok(file) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,4 +6,18 @@ pub enum Error { | |
Io(#[from] std::io::Error), | ||
#[error("Serde: {0}")] | ||
Serde(#[from] serde_json::Error), | ||
#[error("Reqwest: {0}")] | ||
Reqwest(#[from] reqwest::Error), | ||
#[error("Oneshot: {0}")] | ||
Oneshot(#[from] tokio::sync::oneshot::error::RecvError), | ||
Comment on lines
+12
to
+13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like having to add this specific error type. |
||
#[error("Download: {0}")] | ||
Download(#[from] DownloadError), | ||
} | ||
|
||
#[derive(Error, Debug, Clone)] | ||
pub enum DownloadError { | ||
#[error("Download was cancelled")] | ||
Cancelled, | ||
#[error("Retry limit exceeded")] | ||
RetriesExhausted { last_error_msg: String }, | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
pub mod downloader; | ||
mod error; | ||
pub mod runner; | ||
|
||
pub use error::Error; | ||
|
||
pub mod proto { | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The manual
drop
,remove_file
,req.status.send
,req.result.send
are error prone, do we have any ideas how to automatically do these