|
1 | 1 | use std::pin::Pin;
|
2 | 2 |
|
| 3 | +use crate::prelude::*; |
| 4 | +use crate::stream::stream::map::Map; |
3 | 5 | use crate::stream::{IntoStream, Stream};
|
4 | 6 | use crate::task::{Context, Poll};
|
5 | 7 |
|
| 8 | +#[allow(missing_debug_implementations)] |
| 9 | +pub struct FlatMap<S: Stream, U: IntoStream, F> { |
| 10 | + inner: FlattenCompat<Map<S, F, S::Item, U>, U>, |
| 11 | +} |
| 12 | + |
| 13 | +impl<S, U, F> FlatMap<S, U, F> |
| 14 | +where |
| 15 | + S: Stream, |
| 16 | + U: IntoStream, |
| 17 | + F: FnMut(S::Item) -> U, |
| 18 | +{ |
| 19 | + pin_utils::unsafe_pinned!(inner: FlattenCompat<Map<S, F, S::Item, U>, U>); |
| 20 | + |
| 21 | + pub fn new(stream: S, f: F) -> FlatMap<S, U, F> { |
| 22 | + FlatMap { |
| 23 | + inner: FlattenCompat::new(stream.map(f)), |
| 24 | + } |
| 25 | + } |
| 26 | +} |
| 27 | + |
| 28 | +impl<S, U, F> Stream for FlatMap<S, U, F> |
| 29 | +where |
| 30 | + S: Stream<Item: IntoStream<IntoStream = U, Item = U::Item>> + std::marker::Unpin, |
| 31 | + S::Item: std::marker::Unpin, |
| 32 | + U: Stream + std::marker::Unpin, |
| 33 | + F: FnMut(S::Item) -> U + std::marker::Unpin, |
| 34 | +{ |
| 35 | + type Item = U::Item; |
| 36 | + |
| 37 | + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 38 | + self.as_mut().inner().poll_next(cx) |
| 39 | + } |
| 40 | +} |
| 41 | + |
6 | 42 | /// Real logic of both `Flatten` and `FlatMap` which simply delegate to
|
7 | 43 | /// this type.
|
8 | 44 | #[derive(Clone, Debug)]
|
9 | 45 | struct FlattenCompat<S, U> {
|
10 | 46 | stream: S,
|
11 | 47 | frontiter: Option<U>,
|
12 | 48 | }
|
| 49 | + |
13 | 50 | impl<S, U> FlattenCompat<S, U> {
|
14 | 51 | pin_utils::unsafe_unpinned!(stream: S);
|
15 | 52 | pin_utils::unsafe_unpinned!(frontiter: Option<U>);
|
|
0 commit comments