Skip to content

Commit 41832db

Browse files
authored
Merge pull request #106 from ryanoneill/fix/subscription-extraction
Extract subscription module into submodules
2 parents 27232c4 + f26e224 commit 41832db

File tree

7 files changed

+1038
-981
lines changed

7 files changed

+1038
-981
lines changed

src/app/subscription/batch.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::pin::Pin;
2+
3+
use tokio_stream::Stream;
4+
use tokio_util::sync::CancellationToken;
5+
6+
use super::{BoxedSubscription, Subscription};
7+
8+
/// A batch of subscriptions combined into one.
9+
pub struct BatchSubscription<M> {
10+
pub(crate) subscriptions: Vec<BoxedSubscription<M>>,
11+
}
12+
13+
impl<M> BatchSubscription<M> {
14+
/// Creates a batch of subscriptions.
15+
pub fn new(subscriptions: Vec<BoxedSubscription<M>>) -> Self {
16+
Self { subscriptions }
17+
}
18+
}
19+
20+
impl<M: Send + 'static> Subscription<M> for BatchSubscription<M> {
21+
fn into_stream(
22+
self: Box<Self>,
23+
cancel: CancellationToken,
24+
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
25+
use futures_util::stream::SelectAll;
26+
use tokio_stream::StreamExt;
27+
28+
let mut select_all = SelectAll::new();
29+
for sub in self.subscriptions {
30+
select_all.push(sub.into_stream(cancel.clone()));
31+
}
32+
33+
Box::pin(async_stream::stream! {
34+
while let Some(msg) = select_all.next().await {
35+
yield msg;
36+
}
37+
})
38+
}
39+
}
40+
41+
/// Combines multiple subscriptions into one.
42+
pub fn batch<M: Send + 'static>(subscriptions: Vec<BoxedSubscription<M>>) -> BatchSubscription<M> {
43+
BatchSubscription::new(subscriptions)
44+
}

0 commit comments

Comments
 (0)