Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e27583b
chore: Add new download-manager crate
cyberphantom52 Aug 18, 2025
457e25f
downloader: Add deps
cyberphantom52 Aug 18, 2025
f4dd92e
downloader: Define `DownloadError`s
cyberphantom52 Aug 18, 2025
60b8254
downloader: Add initial skel for DownloadManager
cyberphantom52 Aug 18, 2025
95ef478
downloader: Add worker threads to handle downloads
cyberphantom52 Aug 18, 2025
528eabb
downloader: Add Download lifecycle tracking
cyberphantom52 Aug 18, 2025
ea287a8
downloader: Add channel to send result
cyberphantom52 Aug 18, 2025
16df923
downlaoder: Add download cancellation
cyberphantom52 Aug 18, 2025
77b5a07
downloader: Use anyhow::Result for channel errors
cyberphantom52 Aug 18, 2025
f2f34c2
downloader: Add DownloadConfig and retries
cyberphantom52 Aug 18, 2025
3e8db8d
downloader: impl Future for Download
cyberphantom52 Aug 18, 2025
99a1dc2
downloader: Add `Context` for shared data
cyberphantom52 Aug 18, 2025
519a88a
downloader: Add DownloadManagerBuilder
cyberphantom52 Aug 18, 2025
dff32d9
downloader: Add callback TODO
cyberphantom52 Aug 18, 2025
0bc1df4
downloader: Add overwrite option to DownloadConfig
cyberphantom52 Aug 18, 2025
7e23c7e
downloader: Add backoff strategy
cyberphantom52 Aug 19, 2025
77efbff
downloader: Add IDs to downloads
cyberphantom52 Aug 19, 2025
83d40c2
downloader: Fix active downloads count
cyberphantom52 Aug 19, 2025
2a2ec06
downloader: Used typed errors
cyberphantom52 Aug 19, 2025
552d781
downloader: Add event based updates
cyberphantom52 Aug 19, 2025
572fdd7
downlaoder: Add global event streams
cyberphantom52 Aug 19, 2025
3ff5768
downloader: Add progress updates
cyberphantom52 Aug 19, 2025
1a36952
Emit queued after queuing a request.
cyberphantom52 Aug 25, 2025
4540872
Pass down Context to the worker threads
cyberphantom52 Aug 26, 2025
3f0f476
downloader: Implement a Download Scheduler
cyberphantom52 Aug 27, 2025
3c92236
request: Propagate headers to DownloadConfig
cyberphantom52 Aug 27, 2025
59ab5cb
request: Emit events purely in worker
cyberphantom52 Aug 27, 2025
697bc96
downloader: Use headers from config in actual request
cyberphantom52 Aug 27, 2025
5049f09
downloader: Try to get remote metatadata using HEAD request
cyberphantom52 Aug 27, 2025
0edec36
downloader: Implement callbacks
cyberphantom52 Aug 27, 2025
a848b13
downloader: Propagate errors when enqueuing a task
cyberphantom52 Aug 29, 2025
fe88c34
downloader: Add cancel method to manager
cyberphantom52 Aug 29, 2025
f6b925a
downlaoder: Add option to shutdown the manager
cyberphantom52 Aug 29, 2025
e8bece7
downloader: Remove queue_size option
cyberphantom52 Aug 29, 2025
12b30cd
downloader: Fix callback typo
cyberphantom52 Aug 29, 2025
d9691ff
downloader: Add documentation
cyberphantom52 Aug 30, 2025
6476b50
downloader: Fix exports and add prelude module
cyberphantom52 Aug 30, 2025
ca16f1f
downloader: Add a EventBus wrapper
cyberphantom52 Aug 30, 2025
a3f63b4
downloader: Fix cancellation
cyberphantom52 Sep 2, 2025
c737b5d
downloader: Move download logic into worker module
cyberphantom52 Sep 2, 2025
a4548cd
downloader: Add async cancel
cyberphantom52 Sep 2, 2025
dc6db0f
downloader: Simplify callback
cyberphantom52 Sep 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
917 changes: 913 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ readme = "README.md"

[workspace]
members = [
"crates/download-manager",
"crates/next-cli",
"crates/next-core",
"crates/next-server",
Expand All @@ -28,7 +29,11 @@ thiserror = "2.0.15"
tracing = "0.1.41"
tonic = "0.14"
tokio = { version = "1.47", features = ["full"] }
tokio-stream = { version = "0.1.17", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
prost = "0.14"
reqwest = { version = "0.12", features = ["stream"] }
anyhow = "1.0.99"
futures-core = "0.3"
futures-util = "0.3"
derive_builder = "0.20"
18 changes: 18 additions & 0 deletions crates/download-manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "download-manager"
version = "0.1.0"
edition.workspace = true

[lib]
path = "src/download_manager.rs"

[dependencies]
reqwest = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
futures-core = { workspace = true }
futures-util = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
derive_builder = { workspace = true }
74 changes: 74 additions & 0 deletions crates/download-manager/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use reqwest::Client;
use std::sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;

use crate::events::EventBus;

/// Unique identifier for a download; monotonically increasing u64.
pub type DownloadID = u64;

/// Shared runtime context for coordinating downloads. Internal to the crate.
/// Holds the concurrency semaphore, root cancellation token, HTTP client,
/// atomic counters, and the global [DownloadEvent] broadcast sender.
/// Cloned and shared across scheduler and workers.
#[derive(Debug)]
pub(crate) struct Context {
/// Semaphore limiting concurrent active downloads.
pub semaphore: Arc<Semaphore>,
/// Root cancellation token; children inherit via [Context::child_token()].
pub cancel_root: CancellationToken,
/// Shared reqwest client reused across attempts.
pub client: Client,

// Counters
/// Monotonic counter for generating DownloadID values.
pub id_counter: AtomicU64,
/// Number of currently active (running) downloads.
pub active: AtomicUsize,
/// Configured maximum concurrency. Not automatically updated if semaphore changes.
pub max_concurrent: AtomicUsize,

/// Global [DownloadEvent] broadcaster (buffered). Slow subscribers may miss events.
pub events: EventBus,
}

impl Context {
/// Create a new shared Context.
/// - Initializes the semaphore with `max_concurrent` permits.
/// - Creates a root [CancellationToken] and a broadcast channel (capacity 1024).
/// - Constructs a shared [reqwest::Client].
pub fn new(max_concurrent: usize, cancel_root: CancellationToken) -> Arc<Self> {
Arc::new(Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
max_concurrent: AtomicUsize::new(max_concurrent),
cancel_root,
active: AtomicUsize::new(0),
id_counter: AtomicU64::new(1),
client: Client::new(),
events: EventBus::new(),
})
}

/// Atomically generate the next [DownloadID] (relaxed ordering).
/// Unique within the lifetime of this Context; starts at 1.
#[inline]
pub fn next_id(&self) -> DownloadID {
self.id_counter.fetch_add(1, Ordering::Relaxed)
}

/// Create a child [CancellationToken] tied to the manager's root token.
/// Cancelling the root cascades to all children.
#[inline]
pub fn child_token(&self) -> CancellationToken {
self.cancel_root.child_token()
}

/// Cancel the root token, cooperatively cancelling all in-flight downloads.
pub fn cancel_all(&self) {
self.cancel_root.cancel();
}
}
124 changes: 124 additions & 0 deletions crates/download-manager/src/download.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use crate::{DownloadError, DownloadID, Event, Progress};
use futures_core::Stream;
use std::path::PathBuf;
use tokio::sync::{broadcast, oneshot, watch};
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tokio_util::sync::CancellationToken;

/// Handle for a single download scheduled by DownloadManager.
///
/// Behavior:
/// - Implements Future; awaiting resolves to DownloadResult or DownloadError.
/// - Exposes per-download streams via [Download::progress()] and [Download::events()].
/// - Cancellation is cooperative via [Download::cancel()]; the worker aborts the HTTP request and removes any partial file.
pub struct Download {
id: DownloadID,
progress: watch::Receiver<Progress>,
events: broadcast::Receiver<Event>,
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,

cancel_token: CancellationToken,
}

impl Download {
pub(crate) fn new(
id: DownloadID,
progress: watch::Receiver<Progress>,
events: broadcast::Receiver<Event>,
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
cancel_token: CancellationToken,
) -> Self {
Download {
id,
progress,
events,
result,
cancel_token,
}
}

/// Unique identifier for this download, matching [DownloadEvent] IDs.
pub fn id(&self) -> DownloadID {
self.id
}

/// Request cooperative cancellation of this download.
///
/// The scheduler/worker aborts the in-flight HTTP request and deletes any partially
/// written file. Cancellation is best-effort and may race with completion.
pub fn cancel(&self) {
self.cancel_token.cancel();
}

pub fn progress_raw(&self) -> watch::Receiver<Progress> {
self.progress.clone()
}

/// Stream of sampled Progress updates for this download.
///
/// Backed by a watch channel: consumers receive the latest state immediately,
/// and updates are coalesced according to sampling thresholds.
pub fn progress(&self) -> impl Stream<Item = Progress> + 'static {
WatchStream::new(self.progress_raw())
}

/// Stream of [DownloadEvent] values scoped to this download only.
///
/// Backed by a broadcast channel; lagged consumers may drop messages.
/// This stream filters events to those whose id matches this handle.
pub fn events(&self) -> impl Stream<Item = Event> + 'static {
use tokio_stream::StreamExt as _;

let download_id = self.id;
BroadcastStream::new(self.events.resubscribe())
.filter_map(|res| res.ok())
.filter(move |event| {
let matches = match event {
Event::Queued { id, .. }
| Event::Probed { id, .. }
| Event::Started { id, .. }
| Event::Retrying { id, .. }
| Event::Completed { id, .. }
| Event::Failed { id, .. }
| Event::Cancelled { id, .. } => *id == download_id,
};

matches
})
}
}

impl std::future::Future for Download {
type Output = Result<DownloadResult, DownloadError>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use std::pin::Pin;
use std::task::Poll;

match Pin::new(&mut self.result).poll(cx) {
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(_)) => Poll::Ready(Err(DownloadError::ManagerShutdown)),
Poll::Pending => Poll::Pending,
}
}
}

#[derive(Debug)]
pub struct DownloadResult {
pub path: PathBuf,
pub bytes_downloaded: u64,
}

#[derive(Debug, Clone)]
/// Remote metadata obtained via a best-effort `HEAD` probe prior to downloading.
/// Availability depends on server support; fields are None when not provided.
pub struct RemoteInfo {
pub content_length: Option<u64>,
pub accept_ranges: Option<String>,
pub etag: Option<String>,
pub last_modified: Option<String>,
pub content_type: Option<String>,
}
Loading