Skip to content

Commit 271b6f4

Browse files
committed
fix: Using pin_project!
1 parent 3297a0f commit 271b6f4

File tree

1 file changed

+44
-41
lines changed

1 file changed

+44
-41
lines changed

src/stream/stream/flatten.rs

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
use std::pin::Pin;
2+
use pin_project_lite::pin_project;
23

34
use crate::prelude::*;
45
use crate::stream::stream::map::Map;
56
use crate::stream::{IntoStream, Stream};
67
use crate::task::{Context, Poll};
78

8-
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
9-
/// documentation for more.
10-
///
11-
/// [`flat_map`]: trait.Stream.html#method.flat_map
12-
/// [`Stream`]: trait.Stream.html
13-
#[allow(missing_debug_implementations)]
14-
pub struct FlatMap<S: Stream, U: IntoStream, F> {
15-
inner: FlattenCompat<Map<S, F, S::Item, U>, U>,
9+
pin_project! {
10+
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
11+
/// documentation for more.
12+
///
13+
/// [`flat_map`]: trait.Stream.html#method.flat_map
14+
/// [`Stream`]: trait.Stream.html
15+
#[allow(missing_debug_implementations)]
16+
pub struct FlatMap<S: Stream, U: IntoStream, F> {
17+
#[pin]
18+
inner: FlattenCompat<Map<S, F, S::Item, U>, U>,
19+
}
1620
}
1721

1822
impl<S, U, F> FlatMap<S, U, F>
@@ -21,7 +25,6 @@ where
2125
U: IntoStream,
2226
F: FnMut(S::Item) -> U,
2327
{
24-
pin_utils::unsafe_pinned!(inner: FlattenCompat<Map<S, F, S::Item, U>, U>);
2528

2629
pub fn new(stream: S, f: F) -> FlatMap<S, U, F> {
2730
FlatMap {
@@ -33,33 +36,33 @@ where
3336
impl<S, U, F> Stream for FlatMap<S, U, F>
3437
where
3538
S: Stream<Item: IntoStream<IntoStream = U, Item = U::Item>> + std::marker::Unpin,
36-
S::Item: std::marker::Unpin,
3739
U: Stream + std::marker::Unpin,
38-
F: FnMut(S::Item) -> U + std::marker::Unpin,
40+
F: FnMut(S::Item) -> U,
3941
{
4042
type Item = U::Item;
4143

42-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43-
self.as_mut().inner().poll_next(cx)
44+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45+
self.project().inner.poll_next(cx)
4446
}
4547
}
4648

47-
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
48-
/// documentation for more.
49-
///
50-
/// [`flatten`]: trait.Stream.html#method.flatten
51-
/// [`Stream`]: trait.Stream.html
52-
#[allow(missing_debug_implementations)]
53-
pub struct Flatten<S: Stream>
54-
where
55-
S::Item: IntoStream,
56-
{
57-
inner: FlattenCompat<S, <S::Item as IntoStream>::IntoStream>,
49+
pin_project!{
50+
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
51+
/// documentation for more.
52+
///
53+
/// [`flatten`]: trait.Stream.html#method.flatten
54+
/// [`Stream`]: trait.Stream.html
55+
#[allow(missing_debug_implementations)]
56+
pub struct Flatten<S: Stream>
57+
where
58+
S::Item: IntoStream,
59+
{
60+
#[pin]
61+
inner: FlattenCompat<S, <S::Item as IntoStream>::IntoStream>,
62+
}
5863
}
5964

6065
impl<S: Stream<Item: IntoStream>> Flatten<S> {
61-
pin_utils::unsafe_pinned!(inner: FlattenCompat<S, <S::Item as IntoStream>::IntoStream>);
62-
6366
pub fn new(stream: S) -> Flatten<S> {
6467
Flatten { inner: FlattenCompat::new(stream) }
6568
}
@@ -72,24 +75,23 @@ where
7275
{
7376
type Item = U::Item;
7477

75-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76-
self.as_mut().inner().poll_next(cx)
78+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79+
self.project().inner.poll_next(cx)
7780
}
7881
}
7982

8083

81-
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to
82-
/// this type.
83-
#[derive(Clone, Debug)]
84-
struct FlattenCompat<S, U> {
85-
stream: S,
86-
frontiter: Option<U>,
84+
pin_project! {
85+
/// Real logic of both `Flatten` and `FlatMap` which simply delegate to
86+
/// this type.
87+
#[derive(Clone, Debug)]
88+
struct FlattenCompat<S, U> {
89+
stream: S,
90+
frontiter: Option<U>,
91+
}
8792
}
8893

8994
impl<S, U> FlattenCompat<S, U> {
90-
pin_utils::unsafe_unpinned!(stream: S);
91-
pin_utils::unsafe_unpinned!(frontiter: Option<U>);
92-
9395
/// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`.
9496
pub fn new(stream: S) -> FlattenCompat<S, U> {
9597
FlattenCompat {
@@ -106,17 +108,18 @@ where
106108
{
107109
type Item = U::Item;
108110

109-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112+
let mut this = self.project();
110113
loop {
111-
if let Some(ref mut inner) = self.as_mut().frontiter() {
114+
if let Some(inner) = this.frontiter {
112115
if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) {
113116
return Poll::Ready(item);
114117
}
115118
}
116119

117-
match futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)) {
120+
match futures_core::ready!(Pin::new(&mut this.stream).poll_next(cx)) {
118121
None => return Poll::Ready(None),
119-
Some(inner) => *self.as_mut().frontiter() = Some(inner.into_stream()),
122+
Some(inner) => *this.frontiter = Some(inner.into_stream()),
120123
}
121124
}
122125
}

0 commit comments

Comments
 (0)