Skip to content

Commit 4bdc8da

Browse files
committed
Implement FromConcurrentStream for Result<Vec<T>, E>
1 parent a79d3df commit 4bdc8da

File tree

1 file changed

+86
-0
lines changed

1 file changed

+86
-0
lines changed

src/concurrent_stream/from_concurrent_stream.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ impl<T> FromConcurrentStream<T> for Vec<T> {
2828
}
2929
}
3030

31+
impl<T, E> FromConcurrentStream<Result<T, E>> for Result<Vec<T>, E> {
32+
async fn from_concurrent_stream<S>(iter: S) -> Self
33+
where
34+
S: IntoConcurrentStream<Item = Result<T, E>>,
35+
{
36+
let stream = iter.into_co_stream();
37+
let mut output = Ok(Vec::with_capacity(stream.size_hint().1.unwrap_or_default()));
38+
stream.drive(ResultVecConsumer::new(&mut output)).await;
39+
output
40+
}
41+
}
42+
3143
// TODO: replace this with a generalized `fold` operation
3244
#[pin_project]
3345
pub(crate) struct VecConsumer<'a, Fut: Future> {
@@ -73,6 +85,60 @@ where
7385
}
7486
}
7587

88+
#[pin_project]
89+
pub(crate) struct ResultVecConsumer<'a, Fut: Future, T, E> {
90+
#[pin]
91+
group: FuturesUnordered<Fut>,
92+
output: &'a mut Result<Vec<T>, E>,
93+
}
94+
95+
impl<'a, Fut: Future, T, E> ResultVecConsumer<'a, Fut, T, E> {
96+
pub(crate) fn new(output: &'a mut Result<Vec<T>, E>) -> Self {
97+
Self {
98+
group: FuturesUnordered::new(),
99+
output,
100+
}
101+
}
102+
}
103+
104+
impl<'a, Fut, T, E> Consumer<Result<T, E>, Fut> for ResultVecConsumer<'a, Fut, T, E>
105+
where
106+
Fut: Future<Output = Result<T, E>>,
107+
{
108+
type Output = ();
109+
110+
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
111+
let mut this = self.project();
112+
// unbounded concurrency, so we just goooo
113+
this.group.as_mut().push(future);
114+
ConsumerState::Continue
115+
}
116+
117+
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
118+
let mut this = self.project();
119+
120+
while let Some(item) = this.group.next().await {
121+
match item {
122+
Ok(item) => {
123+
let Ok(items) = this.output else {
124+
panic!("progress called after returning ConsumerState::Break");
125+
};
126+
items.push(item);
127+
}
128+
Err(e) => {
129+
**this.output = Err(e);
130+
return ConsumerState::Break;
131+
}
132+
}
133+
}
134+
ConsumerState::Empty
135+
}
136+
137+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
138+
self.progress().await;
139+
}
140+
}
141+
76142
#[cfg(test)]
77143
mod test {
78144
use crate::prelude::*;
@@ -85,4 +151,24 @@ mod test {
85151
assert_eq!(v, &[1, 1, 1, 1, 1]);
86152
});
87153
}
154+
155+
#[test]
156+
fn collect_to_result_ok() {
157+
futures_lite::future::block_on(async {
158+
let v: Result<Vec<_>, ()> = stream::repeat(Ok(1)).co().take(5).collect().await;
159+
assert_eq!(v, Ok(vec![1, 1, 1, 1, 1]));
160+
});
161+
}
162+
163+
#[test]
164+
fn collect_to_result_err() {
165+
futures_lite::future::block_on(async {
166+
let v: Result<Vec<_>, _> = stream::repeat(Err::<u8, _>(()))
167+
.co()
168+
.take(5)
169+
.collect()
170+
.await;
171+
assert_eq!(v, Err(()));
172+
});
173+
}
88174
}

0 commit comments

Comments
 (0)