Skip to content

Draft: Download manager #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dbbe919
chore: Add reqwest crate
cyberphantom52 Jul 20, 2025
748106a
downloader: Implement DownloadManager
cyberphantom52 Jul 20, 2025
c5479b8
error: Add reqwest errors
cyberphantom52 Jul 20, 2025
9c29890
downloader: Propagate errors and correctly update download status
cyberphantom52 Jul 20, 2025
ad41c23
downloader: Implement cancelling downloads
cyberphantom52 Jul 20, 2025
447be8e
downloader: Implement retries
cyberphantom52 Jul 20, 2025
395ced4
downloader: Improve File handling and use switch to tokio::fs::File
cyberphantom52 Jul 20, 2025
1bf7bfd
downloader: Use tokio::select to check for cancellations
cyberphantom52 Jul 20, 2025
9adb239
downloader: Send initial progress update and return error for response
cyberphantom52 Jul 20, 2025
b9911b8
downloader: Implement Future trait for DownloadHandle
cyberphantom52 Jul 20, 2025
9f25867
downloader: End progress updates at completion too
cyberphantom52 Jul 20, 2025
c313200
downloader: Manually drop file handle before deleting on error or cancel
cyberphantom52 Jul 20, 2025
926d2fc
fix: gptk impl
edfloreshz Jul 21, 2025
3fe1e92
downloader: Refactor retry logic and improve errors
cyberphantom52 Jul 21, 2025
49d4f60
downloader: Add method to wait for status updates
cyberphantom52 Jul 21, 2025
7973137
downloader: fix retry logic
cyberphantom52 Jul 21, 2025
31f5218
downloader: Use tokio_util::CancellationToken to handle cancellations
cyberphantom52 Jul 21, 2025
d96688d
downloader: Add hierarchical cancel tokens to cancel all downloads
cyberphantom52 Jul 21, 2025
3a14a0d
downloader: Split the implementation into a module
cyberphantom52 Jul 23, 2025
5566d67
downloader: Add better progress updates with rate limiting
cyberphantom52 Jul 23, 2025
2aac716
downloader: Better download interface with errors
cyberphantom52 Jul 23, 2025
9251524
downloader: Add some convinience methods related to download status
cyberphantom52 Jul 23, 2025
0670468
downloader: Add config for download manager
cyberphantom52 Jul 23, 2025
ce05710
downloader: Use TaskTracker to track thread lifecycles
cyberphantom52 Jul 23, 2025
633b752
downloader: Add shutdown, active_downloads and queued_downloads
cyberphantom52 Jul 23, 2025
09b47b6
downloader: Add DownloadConfig and rate limiting
cyberphantom52 Jul 23, 2025
4803c51
downloader: Use builder pattern to build download requests
cyberphantom52 Jul 23, 2025
9e9b298
downloader: Don't rexport worker_thread
cyberphantom52 Jul 23, 2025
987beaa
downloader: Switch to url::Url
cyberphantom52 Jul 23, 2025
c550480
downloader: Accept Url's directly instead of parsing them
cyberphantom52 Jul 23, 2025
8354cc9
downloader: Add convinience methods to send updates and result
cyberphantom52 Jul 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tracing = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
prost = { workspace = true }
reqwest = "0.12"

[build-dependencies]
tonic-build = "0.12.3"
154 changes: 154 additions & 0 deletions src/downloader.rs
Copy link
Member

@mirkobrombin mirkobrombin Jul 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the status channel being created but never updated to InProgress, Completed, or Failed (am I wrong?)
There are also more things that are missing but I imagine this is still a WIP.

Good job for the moment <3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not even be working :). I just wanted to put it out so the design can evolve with comments.

Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use reqwest::{Client, Url};
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};

const QUEUE_SIZE: usize = 100;

#[derive(Debug)]
struct DownloadRequest {
url: Url,
destination: PathBuf,
result: oneshot::Sender<Result<File, reqwest::Error>>,
status: watch::Sender<Status>,
progress: broadcast::Sender<DownloadProgress>,
}

#[derive(Debug, Clone, Copy)]
pub struct DownloadProgress {
pub bytes_downloaded: u64,
pub total_bytes: Option<u64>,
}

#[derive(Debug)]
pub struct DownloadHandle {
result: oneshot::Receiver<Result<File, reqwest::Error>>,
status: watch::Receiver<Status>,
progress: broadcast::Receiver<DownloadProgress>,
}

impl DownloadHandle {
pub async fn r#await(self) -> Result<std::fs::File, reqwest::Error> {
match self.result.await {
Ok(result) => result,
Err(_) => todo!(),
}
}

pub fn status(&self) -> Status {
self.status.borrow().clone()
}

pub fn subscribe_progress(&self) -> &broadcast::Receiver<DownloadProgress> {
&self.progress
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Status {
Pending,
InProgress,
Completed,
Failed,
}

#[derive(Debug)]
pub struct DownloadManager {
queue: mpsc::Sender<DownloadRequest>,
semaphore: Arc<Semaphore>,
}

impl Drop for DownloadManager {
fn drop(&mut self) {
// Need to manually close the semaphore to make sure dispatcher_thread stops waiting for permits
self.semaphore.close();
}
}

impl DownloadManager {
pub fn new(limit: usize) -> Self {
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
let client = Client::new();
let semaphore = Arc::new(Semaphore::new(limit));
let manager = Self {
queue: tx,
semaphore: semaphore.clone(),
};
// Spawn the dispatcher thread to handle download requests
tokio::spawn(async move { dispatcher_thread(client, rx, semaphore).await });
manager
}

pub fn set_max_parallel_downloads(&self, limit: usize) {
let current = self.semaphore.available_permits();
if limit > current {
self.semaphore.add_permits(limit - current);
} else if limit < current {
let to_remove = current - limit;
for _ in 0..to_remove {
let _ = self.semaphore.try_acquire();
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On looking carefully, This has a race condition so this needs to be reimplemented.


pub fn add_request(&self, url: Url, destination: PathBuf) -> DownloadHandle {
let (result_tx, result_rx) = oneshot::channel();
let (status_tx, status_rx) = watch::channel(Status::Pending);
let (progress_tx, progress_rx) = broadcast::channel(16);

let req = DownloadRequest {
url,
destination,
result: result_tx,
status: status_tx,
progress: progress_tx,
};

let _ = self.queue.try_send(req);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the try_send safe to be ignored here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, i just need a new error type to represent the error here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger!


DownloadHandle {
result: result_rx,
status: status_rx,
progress: progress_rx,
}
}
}

async fn dispatcher_thread(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch and download threads can outlive the DownloadManager itself. Is this desirable or should we keep track of the threads to manage their lifecycles?

client: Client,
mut rx: mpsc::Receiver<DownloadRequest>,
sem: Arc<Semaphore>,
) {
while let Some(request) = rx.recv().await {
let permit = match sem.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => break,
};
let client = client.clone();
tokio::spawn(async move {
// Move the permit into the worker thread so it's automatically released when the thread finishes
let _permit = permit;
let _ = download_thread(client, request).await;
});
}
}

async fn download_thread(
client: Client,
req: DownloadRequest,
) -> Result<(), Box<dyn std::error::Error>> {
let mut resp = client.get(req.url).send().await?;
let total = resp.content_length();
let mut file = File::create(&req.destination)?;
// let mut stream = resp.bytes().await?;
let mut downloaded = 0u64;
while let Some(chunk) = resp.chunk().await.transpose() {
let chunk = chunk?;
file.write_all(&chunk)?;
downloaded += chunk.len() as u64;
let _ = req.progress.send(DownloadProgress {
bytes_downloaded: downloaded,
total_bytes: total,
});
}
Ok(())
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod downloader;
mod error;
pub mod runner;

pub use error::Error;

pub mod proto {
Expand Down