Skip to content

Commit 6bb4036

Browse files
downloader: Add a EventBus wrapper
1 parent 6476b50 commit 6bb4036

File tree

6 files changed

+109
-46
lines changed

6 files changed

+109
-46
lines changed

crates/download-manager/src/context.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::{
66
use tokio::sync::{Semaphore, broadcast};
77
use tokio_util::sync::CancellationToken;
88

9-
use crate::DownloadEvent;
9+
use crate::{Event, events::EventBus};
1010

1111
/// Unique identifier for a download; monotonically increasing u64.
1212
pub type DownloadID = u64;
@@ -33,7 +33,7 @@ pub(crate) struct Context {
3333
pub max_concurrent: AtomicUsize,
3434

3535
/// Global [DownloadEvent] broadcaster (buffered). Slow subscribers may miss events.
36-
pub events: broadcast::Sender<DownloadEvent>,
36+
pub events: EventBus,
3737
}
3838

3939
impl Context {
@@ -42,15 +42,14 @@ impl Context {
4242
/// - Creates a root [CancellationToken] and a broadcast channel (capacity 1024).
4343
/// - Constructs a shared [reqwest::Client].
4444
pub fn new(max_concurrent: usize) -> Arc<Self> {
45-
let (events, _) = broadcast::channel(1024);
4645
Arc::new(Self {
4746
semaphore: Arc::new(Semaphore::new(max_concurrent)),
4847
max_concurrent: AtomicUsize::new(max_concurrent),
4948
cancel_root: CancellationToken::new(),
5049
active: AtomicUsize::new(0),
5150
id_counter: AtomicU64::new(1),
5251
client: Client::new(),
53-
events,
52+
events: EventBus::new(),
5453
})
5554
}
5655

crates/download-manager/src/download.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{DownloadError, DownloadEvent, DownloadID, Progress};
1+
use crate::{DownloadError, DownloadID, Event, Progress};
22
use futures_core::Stream;
33
use std::path::PathBuf;
44
use tokio::sync::{broadcast, oneshot, watch};
@@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken;
1414
pub struct Download {
1515
id: DownloadID,
1616
progress: watch::Receiver<Progress>,
17-
events: broadcast::Receiver<DownloadEvent>,
17+
events: broadcast::Receiver<Event>,
1818
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
1919

2020
cancel_token: CancellationToken,
@@ -24,7 +24,7 @@ impl Download {
2424
pub(crate) fn new(
2525
id: DownloadID,
2626
progress: watch::Receiver<Progress>,
27-
events: broadcast::Receiver<DownloadEvent>,
27+
events: broadcast::Receiver<Event>,
2828
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
2929
cancel_token: CancellationToken,
3030
) -> Self {
@@ -66,21 +66,21 @@ impl Download {
6666
///
6767
/// Backed by a broadcast channel; lagged consumers may drop messages.
6868
/// This stream filters events to those whose id matches this handle.
69-
pub fn events(&self) -> impl Stream<Item = DownloadEvent> + 'static {
69+
pub fn events(&self) -> impl Stream<Item = Event> + 'static {
7070
use tokio_stream::StreamExt as _;
7171

7272
let download_id = self.id;
7373
BroadcastStream::new(self.events.resubscribe())
7474
.filter_map(|res| res.ok())
7575
.filter(move |event| {
7676
let matches = match event {
77-
DownloadEvent::Queued { id, .. }
78-
| DownloadEvent::Probed { id, .. }
79-
| DownloadEvent::Started { id, .. }
80-
| DownloadEvent::Retrying { id, .. }
81-
| DownloadEvent::Completed { id, .. }
82-
| DownloadEvent::Failed { id, .. }
83-
| DownloadEvent::Cancelled { id, .. } => *id == download_id,
77+
Event::Queued { id, .. }
78+
| Event::Probed { id, .. }
79+
| Event::Started { id, .. }
80+
| Event::Retrying { id, .. }
81+
| Event::Completed { id, .. }
82+
| Event::Failed { id, .. }
83+
| Event::Cancelled { id, .. } => *id == download_id,
8484
};
8585

8686
matches

crates/download-manager/src/download_manager.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub mod prelude {
99
context::DownloadID,
1010
download::{Download, DownloadResult},
1111
error::DownloadError,
12-
events::{DownloadEvent, Progress},
12+
events::{Event, Progress},
1313
request::Request,
1414
};
1515
}
@@ -27,7 +27,6 @@ use std::{
2727
sync::{Arc, atomic::Ordering},
2828
};
2929
use tokio::sync::{broadcast, mpsc};
30-
use tokio_stream::wrappers::BroadcastStream;
3130
use tokio_util::{sync::CancellationToken, task::TaskTracker};
3231

3332
/// Entry point for scheduling, observing, and cancelling downloads.
@@ -116,17 +115,15 @@ impl DownloadManager {
116115
///
117116
/// The underlying broadcast channel has a bounded buffer (1024). Slow consumers may lag and
118117
/// miss events. Consider using [DownloadManager::events()] for a stream that skips lagged messages gracefully.
119-
pub fn subscribe(&self) -> broadcast::Receiver<DownloadEvent> {
118+
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
120119
self.ctx.events.subscribe()
121120
}
122121

123122
/// A fallible-safe stream of global [DownloadEvent] values.
124123
///
125124
/// Internally wraps the broadcast receiver and filters out lagged/closed errors.
126-
pub fn events(&self) -> impl Stream<Item = DownloadEvent> + 'static {
127-
use tokio_stream::StreamExt as _;
128-
129-
BroadcastStream::new(self.subscribe()).filter_map(|res| res.ok())
125+
pub fn events(&self) -> impl Stream<Item = Event> + 'static {
126+
self.ctx.events.events()
130127
}
131128

132129
/// Gracefully stop the manager.

crates/download-manager/src/events.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,36 @@ use crate::download::RemoteInfo;
33
use reqwest::Url;
44
use std::path::PathBuf;
55
use std::time::{Duration, Instant};
6+
use tokio::sync::broadcast;
67

78
#[derive(Debug, Clone)]
8-
pub enum DownloadEvent {
9+
pub(crate) struct EventBus(broadcast::Sender<Event>);
10+
11+
impl EventBus {
12+
pub fn new() -> Self {
13+
let (tx, _rx) = broadcast::channel(1024);
14+
EventBus(tx)
15+
}
16+
17+
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
18+
self.0.subscribe()
19+
}
20+
21+
pub fn events(&self) -> impl tokio_stream::Stream<Item = Event> + 'static {
22+
use tokio_stream::StreamExt as _;
23+
use tokio_stream::wrappers::BroadcastStream;
24+
25+
BroadcastStream::new(self.subscribe()).filter_map(|res| res.ok())
26+
}
27+
28+
pub fn send(&self, event: Event) {
29+
// TODO: Log the error
30+
let _ = self.0.send(event);
31+
}
32+
}
33+
34+
#[derive(Debug, Clone)]
35+
pub enum Event {
936
Queued {
1037
id: DownloadID,
1138
url: Url,
@@ -40,6 +67,48 @@ pub enum DownloadEvent {
4067
},
4168
}
4269

70+
impl std::fmt::Display for Event {
71+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72+
match self {
73+
Event::Queued { id, .. } => write!(f, "[{}] Queued", id),
74+
Event::Probed { id, info } => write!(f, "[{}] Probed: {:?}", id, info),
75+
Event::Failed { id, error } => write!(f, "[{}] Failed: {}", id, error),
76+
Event::Cancelled { id } => write!(f, "[{}] Cancelled", id),
77+
Event::Started {
78+
id, total_bytes, ..
79+
} => {
80+
if let Some(total) = total_bytes {
81+
write!(f, "[{}] Started ({} bytes)", id, total)
82+
} else {
83+
write!(f, "[{}] Started", id)
84+
}
85+
}
86+
Event::Retrying {
87+
id,
88+
attempt,
89+
next_delay_ms,
90+
} => {
91+
write!(
92+
f,
93+
"[{}] Retrying: attempt {} in {} ms",
94+
id, attempt, next_delay_ms
95+
)
96+
}
97+
Event::Completed {
98+
id,
99+
path,
100+
bytes_downloaded,
101+
} => {
102+
write!(
103+
f,
104+
"[{}] Completed: {:?} ({} bytes)",
105+
id, path, bytes_downloaded
106+
)
107+
}
108+
}
109+
}
110+
}
111+
43112
#[derive(Debug, Clone, Copy)]
44113
pub struct Progress {
45114
pub bytes_downloaded: u64,

crates/download-manager/src/request.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
Download, DownloadEvent, DownloadID, DownloadManager, Progress, scheduler::SchedulerCmd,
2+
Download, DownloadID, DownloadManager, Event, Progress, events::EventBus,
3+
scheduler::SchedulerCmd,
34
};
45
use derive_builder::Builder;
56
use reqwest::{
@@ -10,7 +11,7 @@ use std::{
1011
path::{Path, PathBuf},
1112
sync::Arc,
1213
};
13-
use tokio::sync::{broadcast, oneshot, watch};
14+
use tokio::sync::{oneshot, watch};
1415
use tokio_util::sync::CancellationToken;
1516

1617
/// Immutable description of a single download request.
@@ -26,10 +27,10 @@ pub struct Request {
2627
config: DownloadConfig,
2728

2829
progress: watch::Sender<Progress>,
29-
events: broadcast::Sender<DownloadEvent>,
30+
events: EventBus,
3031

3132
pub(crate) on_progress: Option<Arc<Box<dyn Fn(Progress) + Send + Sync>>>,
32-
pub(crate) on_event: Option<Arc<Box<dyn Fn(DownloadEvent) + Send + Sync>>>,
33+
pub(crate) on_event: Option<Arc<Box<dyn Fn(Event) + Send + Sync>>>,
3334

3435
pub cancel_token: CancellationToken,
3536
}
@@ -116,9 +117,8 @@ impl Request {
116117
&self.config
117118
}
118119

119-
pub fn emit(&self, event: DownloadEvent) {
120-
// TODO: Log the error
121-
let _ = self.events.send(event.clone());
120+
pub fn emit(&self, event: Event) {
121+
self.events.send(event.clone());
122122
self.on_event.as_ref().map(|cb| cb(event));
123123
}
124124

@@ -138,7 +138,7 @@ pub struct RequestBuilder<'a> {
138138
config: DownloadConfigBuilder,
139139

140140
on_progress: Option<Arc<Box<dyn Fn(Progress) + Send + Sync>>>,
141-
on_event: Option<Arc<Box<dyn Fn(DownloadEvent) + Send + Sync>>>,
141+
on_event: Option<Arc<Box<dyn Fn(Event) + Send + Sync>>>,
142142

143143
manager: &'a DownloadManager,
144144
}
@@ -199,7 +199,7 @@ impl RequestBuilder<'_> {
199199
/// Called for events emitted for this request only.
200200
pub fn on_event<F>(mut self, callback: F) -> Self
201201
where
202-
F: Fn(DownloadEvent) + Send + Sync + 'static,
202+
F: Fn(Event) + Send + Sync + 'static,
203203
{
204204
self.on_event = Some(Arc::new(Box::new(callback)));
205205
self
@@ -220,8 +220,8 @@ impl RequestBuilder<'_> {
220220
let (result_tx, result_rx) = oneshot::channel();
221221
let (progress_tx, progress_rx) = watch::channel(Progress::new(None));
222222
let cancel_token = self.manager.child_token();
223-
let event_tx = self.manager.ctx.events.clone();
224-
let event_rx = event_tx.subscribe();
223+
let event_bus = self.manager.ctx.events.clone();
224+
let event_rx = event_bus.subscribe();
225225

226226
let on_progress = self.on_progress;
227227
let on_event = self.on_event;
@@ -235,7 +235,7 @@ impl RequestBuilder<'_> {
235235
on_progress,
236236
on_event,
237237

238-
events: event_tx,
238+
events: event_bus,
239239
progress: progress_tx,
240240
cancel_token: cancel_token.clone(),
241241
};

crates/download-manager/src/scheduler.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::{
1414
use tokio_util::{task::TaskTracker, time::DelayQueue};
1515

1616
use crate::{
17-
DownloadError, DownloadEvent, DownloadID, DownloadResult, Progress, Request, context::Context,
17+
DownloadError, DownloadID, DownloadResult, Event, Progress, Request, context::Context,
1818
download::RemoteInfo,
1919
};
2020

@@ -88,7 +88,7 @@ impl Scheduler {
8888
fn schedule(&mut self, job: Job) {
8989
let request = &job.request;
9090
let id = job.id();
91-
request.emit(DownloadEvent::Queued {
91+
request.emit(Event::Queued {
9292
id,
9393
url: request.url().clone(),
9494
destination: request.destination().to_path_buf(),
@@ -233,15 +233,15 @@ impl Job {
233233
}
234234

235235
fn fail(self, error: DownloadError) {
236-
self.request.emit(DownloadEvent::Failed {
236+
self.request.emit(Event::Failed {
237237
id: self.id(),
238238
error: error.to_string(),
239239
});
240240
self.send_result(Err(error));
241241
}
242242

243243
fn finish(self, result: DownloadResult) {
244-
self.request.emit(DownloadEvent::Completed {
244+
self.request.emit(Event::Completed {
245245
id: self.id(),
246246
path: result.path.clone(),
247247
bytes_downloaded: result.bytes_downloaded,
@@ -250,7 +250,7 @@ impl Job {
250250
}
251251

252252
fn retry(&self, delay: Duration) {
253-
self.request.emit(DownloadEvent::Retrying {
253+
self.request.emit(Event::Retrying {
254254
id: self.id(),
255255
attempt: self.attempt,
256256
next_delay_ms: delay.as_millis() as u64,
@@ -259,8 +259,7 @@ impl Job {
259259

260260
fn cancel(self) {
261261
self.request.cancel_token.cancel();
262-
self.request
263-
.emit(DownloadEvent::Cancelled { id: self.id() });
262+
self.request.emit(Event::Cancelled { id: self.id() });
264263
self.send_result(Err(DownloadError::Cancelled))
265264
}
266265
}
@@ -320,7 +319,7 @@ async fn attempt_download(
320319
client: Client,
321320
) -> Result<DownloadResult, DownloadError> {
322321
if let Some(info) = probe_head(request, &client).await {
323-
request.emit(DownloadEvent::Probed {
322+
request.emit(Event::Probed {
324323
id: request.id(),
325324
info,
326325
});
@@ -344,7 +343,7 @@ async fn attempt_download(
344343
let total_bytes = response.content_length();
345344

346345
let mut file = File::create(request.destination()).await?;
347-
request.emit(DownloadEvent::Started {
346+
request.emit(Event::Started {
348347
id: request.id(),
349348
url: request.url().clone(),
350349
destination: request.destination().to_path_buf(),
@@ -364,8 +363,7 @@ async fn attempt_download(
364363
Ok(Some(chunk)) => {
365364
file.write_all(&chunk).await?;
366365
if progress.update(chunk.len() as u64) {
367-
// TODO: Log the error
368-
let _ = request.update_progress(progress);
366+
request.update_progress(progress);
369367
}
370368
}
371369
Ok(None) => break,

0 commit comments

Comments
 (0)