Skip to content

Commit 327db34

Browse files
downlaoder: Add global event streams
1 parent 7c2d4cd commit 327db34

File tree

5 files changed

+39
-1
lines changed

5 files changed

+39
-1
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ thiserror = "2.0.15"
2929
tracing = "0.1.41"
3030
tonic = "0.14"
3131
tokio = { version = "1.47", features = ["full"] }
32+
tokio-stream = { version = "0.1.17", features = ["full"] }
3233
tokio-util = { version = "0.7", features = ["full"] }
3334
prost = "0.14"
3435
reqwest = { version = "0.12", features = ["stream"] }

crates/download-manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ path = "src/download_manager.rs"
99
[dependencies]
1010
reqwest = { workspace = true }
1111
tokio = { workspace = true }
12+
tokio-stream = { workspace = true }
1213
tokio-util = { workspace = true }
1314
futures-core = { workspace = true }
1415
futures-util = { workspace = true }

crates/download-manager/src/download.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::{DownloadError, DownloadEvent, DownloadID};
2+
use futures_core::Stream;
23
use std::path::PathBuf;
34
use tokio::sync::{broadcast, oneshot};
5+
use tokio_stream::wrappers::BroadcastStream;
46
use tokio_util::sync::CancellationToken;
57

68
pub struct Download {
@@ -33,6 +35,26 @@ impl Download {
3335
pub fn cancel(&self) {
3436
self.cancel_token.cancel();
3537
}
38+
39+
pub fn events(&self) -> impl Stream<Item = DownloadEvent> + 'static {
40+
use tokio_stream::StreamExt as _;
41+
42+
let download_id = self.id;
43+
BroadcastStream::new(self.events.resubscribe())
44+
.filter_map(|res| res.ok())
45+
.filter(move |event| {
46+
let matches = match event {
47+
DownloadEvent::Queued { id, .. }
48+
| DownloadEvent::Started { id, .. }
49+
| DownloadEvent::Retrying { id, .. }
50+
| DownloadEvent::Completed { id, .. }
51+
| DownloadEvent::Failed { id, .. }
52+
| DownloadEvent::Cancelled { id, .. } => *id == download_id,
53+
};
54+
55+
matches
56+
})
57+
}
3658
}
3759

3860
impl std::future::Future for Download {

crates/download-manager/src/download_manager.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ pub use crate::{
1313
events::{DownloadEvent, Progress},
1414
request::Request,
1515
};
16+
use futures_core::Stream;
1617
use reqwest::Url;
1718
use std::{
1819
path::Path,
1920
sync::{Arc, atomic::Ordering},
2021
};
21-
use tokio::sync::mpsc;
22+
use tokio::sync::{broadcast, mpsc};
23+
use tokio_stream::wrappers::BroadcastStream;
2224
use tokio_util::{sync::CancellationToken, task::TaskTracker};
2325

2426
pub struct DownloadManager {
@@ -80,6 +82,16 @@ impl DownloadManager {
8082
pub fn child_token(&self) -> CancellationToken {
8183
self.ctx.child_token()
8284
}
85+
86+
pub fn subscribe(&self) -> broadcast::Receiver<DownloadEvent> {
87+
self.ctx.events.subscribe()
88+
}
89+
90+
pub fn events(&self) -> impl Stream<Item = DownloadEvent> + 'static {
91+
use tokio_stream::StreamExt as _;
92+
93+
BroadcastStream::new(self.subscribe()).filter_map(|res| res.ok())
94+
}
8395
}
8496

8597
pub struct DownloadManagerBuilder {

0 commit comments

Comments
 (0)