Skip to content

Draft: Download manager #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dbbe919
chore: Add reqwest crate
cyberphantom52 Jul 20, 2025
748106a
downloader: Implement DownloadManager
cyberphantom52 Jul 20, 2025
c5479b8
error: Add reqwest errors
cyberphantom52 Jul 20, 2025
9c29890
downloader: Propagate errors and correctly update download status
cyberphantom52 Jul 20, 2025
ad41c23
downloader: Implement cancelling downloads
cyberphantom52 Jul 20, 2025
447be8e
downloader: Implement retries
cyberphantom52 Jul 20, 2025
395ced4
downloader: Improve File handling and use switch to tokio::fs::File
cyberphantom52 Jul 20, 2025
1bf7bfd
downloader: Use tokio::select to check for cancellations
cyberphantom52 Jul 20, 2025
9adb239
downloader: Send initial progress update and return error for response
cyberphantom52 Jul 20, 2025
b9911b8
downloader: Implement Future trait for DownloadHandle
cyberphantom52 Jul 20, 2025
9f25867
downloader: End progress updates at completion too
cyberphantom52 Jul 20, 2025
c313200
downloader: Manually drop file handle before deleting on error or cancel
cyberphantom52 Jul 20, 2025
926d2fc
fix: gptk impl
edfloreshz Jul 21, 2025
3fe1e92
downloader: Refactor retry logic and improve errors
cyberphantom52 Jul 21, 2025
49d4f60
downloader: Add method to wait for status updates
cyberphantom52 Jul 21, 2025
7973137
downloader: fix retry logic
cyberphantom52 Jul 21, 2025
31f5218
downloader: Use tokio_util::CancellationToken to handle cancellations
cyberphantom52 Jul 21, 2025
d96688d
downloader: Add hierarchical cancel tokens to cancel all downloads
cyberphantom52 Jul 21, 2025
3a14a0d
downloader: Split the implementation into a module
cyberphantom52 Jul 23, 2025
5566d67
downloader: Add better progress updates with rate limiting
cyberphantom52 Jul 23, 2025
2aac716
downloader: Better download interface with errors
cyberphantom52 Jul 23, 2025
9251524
downloader: Add some convinience methods related to download status
cyberphantom52 Jul 23, 2025
0670468
downloader: Add config for download manager
cyberphantom52 Jul 23, 2025
ce05710
downloader: Use TaskTracker to track thread lifecycles
cyberphantom52 Jul 23, 2025
633b752
downloader: Add shutdown, active_downloads and queued_downloads
cyberphantom52 Jul 23, 2025
09b47b6
downloader: Add DownloadConfig and rate limiting
cyberphantom52 Jul 23, 2025
4803c51
downloader: Use builder pattern to build download requests
cyberphantom52 Jul 23, 2025
9e9b298
downloader: Don't rexport worker_thread
cyberphantom52 Jul 23, 2025
987beaa
downloader: Switch to url::Url
cyberphantom52 Jul 23, 2025
c550480
downloader: Accept Url's directly instead of parsing them
cyberphantom52 Jul 23, 2025
8354cc9
downloader: Add convinience methods to send updates and result
cyberphantom52 Jul 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ tracing = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
prost = { workspace = true }
reqwest = "0.12"
tokio-util = { version = "0.7", features = ["rt"] }

[build-dependencies]
tonic-build = "0.12.3"
82 changes: 82 additions & 0 deletions src/downloader/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

#[derive(Debug, Clone)]
pub struct DownloadManagerConfig {
max_concurrent: Arc<AtomicUsize>,
queue_size: usize,
}

impl Default for DownloadManagerConfig {
fn default() -> Self {
Self {
max_concurrent: Arc::new(AtomicUsize::new(3)),
queue_size: 100,
}
}
}

impl DownloadManagerConfig {
pub fn queue_size(&self) -> usize {
self.queue_size
}

pub fn max_concurrent(&self) -> usize {
self.max_concurrent.load(Ordering::Relaxed)
}

pub fn set_max_concurrent(&self, max: usize) {
self.max_concurrent.store(max, Ordering::Relaxed);
}
}

#[derive(Debug, Clone)]
pub struct DownloadConfig {
max_retries: usize,
user_agent: Option<String>,
progress_update_interval: Duration,
}

impl Default for DownloadConfig {
fn default() -> Self {
Self {
max_retries: 3,
user_agent: None,
progress_update_interval: Duration::from_millis(1000),
}
}
}

impl DownloadConfig {
pub fn with_max_retries(mut self, retries: usize) -> Self {
self.max_retries = retries;
self
}

pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = Some(user_agent.into());
self
}

pub fn with_progress_interval(mut self, interval: Duration) -> Self {
self.progress_update_interval = interval;
self
}

pub fn max_retries(&self) -> usize {
self.max_retries
}

pub fn user_agent(&self) -> Option<&str> {
self.user_agent.as_deref()
}

pub fn progress_update_interval(&self) -> Duration {
self.progress_update_interval
}
}
65 changes: 65 additions & 0 deletions src/downloader/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use super::Status;
use crate::Error;
use tokio::{
fs::File,
sync::{oneshot, watch},
};
use tokio_util::sync::CancellationToken;

#[derive(Debug)]
pub struct DownloadHandle {
result: oneshot::Receiver<Result<File, Error>>,
status: watch::Receiver<Status>,
cancel: CancellationToken,
}

impl DownloadHandle {
pub fn new(
result: oneshot::Receiver<Result<File, Error>>,
status: watch::Receiver<Status>,
cancel: CancellationToken,
) -> Self {
Self {
result,
status,
cancel,
}
}

pub fn status(&self) -> Status {
*self.status.borrow()
}

pub fn is_completed(&self) -> bool {
matches!(self.status(), Status::Completed)
}

pub fn is_cancelled(&self) -> bool {
matches!(self.status(), Status::Cancelled)
}

pub async fn wait_for_status_update(&mut self) -> Result<(), watch::error::RecvError> {
self.status.changed().await
}

pub fn cancel(&self) {
self.cancel.cancel();
}
}

impl std::future::Future for DownloadHandle {
type Output = Result<tokio::fs::File, Error>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use std::pin::Pin;
use std::task::Poll;
match Pin::new(&mut self.result).poll(cx) {
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Oneshot(e))),
Poll::Pending => Poll::Pending,
}
}
}
139 changes: 139 additions & 0 deletions src/downloader/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use super::{
download_thread, DownloadBuilder, DownloadConfig, DownloadManagerConfig, DownloadRequest,
};
use crate::{error::DownloadError, Error};
use reqwest::{Client, Url};
use std::{path::Path, sync::Arc};
use tokio::sync::{mpsc, Semaphore};
use tokio_util::{sync::CancellationToken, task::TaskTracker};

#[derive(Debug)]
pub struct DownloadManager {
queue: mpsc::Sender<DownloadRequest>,
semaphore: Arc<Semaphore>,
cancel: CancellationToken,
config: DownloadManagerConfig,
tracker: TaskTracker,
}

impl Default for DownloadManager {
fn default() -> Self {
Self::with_config(DownloadManagerConfig::default())
}
}

impl DownloadManager {
pub fn with_config(config: DownloadManagerConfig) -> Self {
let (tx, rx) = mpsc::channel(config.queue_size());
let client = Client::new();
let semaphore = Arc::new(Semaphore::new(config.max_concurrent()));
let tracker = TaskTracker::new();
let manager = Self {
queue: tx,
semaphore: semaphore.clone(),
cancel: CancellationToken::new(),
config,
tracker: tracker.clone(),
};
// Spawn the dispatcher thread to handle download requests
tracker.spawn(dispatcher_thread(client, rx, semaphore, tracker.clone()));
manager
}

pub fn download(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this method to something like new_request_builder since it now returns a download request?

&self,
url: impl TryInto<Url>,
destination: impl AsRef<Path>,
) -> Result<DownloadBuilder, Error> {
let url = url
.try_into()
.map_err(|_| Error::Download(DownloadError::InvalidUrl))?;
Ok(DownloadBuilder::new(self, url, destination))
}

pub fn download_with_config(
&self,
url: Url,
destination: impl AsRef<Path>,
config: DownloadConfig,
) -> Result<DownloadBuilder, Error> {
self.download(url, destination)
.map(|builder| builder.with_config(config))
}

pub async fn set_max_parallel_downloads(&self, limit: usize) -> Result<(), Error> {
let current = self.config.max_concurrent();
if limit > current {
self.semaphore.add_permits(limit - current);
} else if limit < current {
let to_remove = current - limit;

let permits = self
.semaphore
.acquire_many(to_remove as u32)
.await
.map_err(|_| Error::Download(DownloadError::ManagerShutdown))?;

permits.forget();
}
self.config.set_max_concurrent(limit);

Ok(())
}

pub fn cancel_all(&self) {
self.cancel.cancel();
}

pub fn queued_downloads(&self) -> usize {
self.queue.max_capacity() - self.queue.capacity()
}

pub fn active_downloads(&self) -> usize {
// -1 because the dispatcher thread is always running
self.tracker.len() - 1
}

pub async fn shutdown(self) -> Result<(), Error> {
self.cancel.cancel();
self.tracker.close();
self.tracker.wait().await;
drop(self.queue);
Ok(())
}

pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}

pub fn child_token(&self) -> CancellationToken {
self.cancel.child_token()
}

pub fn queue_request(&self, req: DownloadRequest) -> Result<(), Error> {
self.queue.try_send(req).map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => Error::Download(DownloadError::QueueFull),
mpsc::error::TrySendError::Closed(_) => Error::Download(DownloadError::ManagerShutdown),
})
}
}

async fn dispatcher_thread(
client: Client,
mut rx: mpsc::Receiver<DownloadRequest>,
sem: Arc<Semaphore>,
tracker: TaskTracker,
) {
while let Some(request) = rx.recv().await {
let permit = match sem.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => break,
};
let client = client.clone();
tracker.spawn(async move {
// Move the permit into the worker thread so it's automatically released when the thread finishes
let _permit = permit;
download_thread(client.clone(), request).await;
});
}
}
13 changes: 13 additions & 0 deletions src/downloader/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod config;
mod handle;
mod manager;
mod progress;
mod request;
mod worker;

pub use config::{DownloadConfig, DownloadManagerConfig};
pub use handle::DownloadHandle;
pub use manager::DownloadManager;
pub use progress::{DownloadProgress, Status};
pub use request::{DownloadBuilder, DownloadRequest};
pub(self) use worker::download_thread;
Loading