Skip to content

Commit 745bb53

Browse files
committed
start pinning APIs
1 parent 49b91e9 commit 745bb53

File tree

8 files changed

+116
-73
lines changed

8 files changed

+116
-73
lines changed

src/concurrent_stream/enumerate.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use pin_project::pin_project;
2+
13
use super::{ConcurrentStream, Consumer};
24
use core::future::Future;
35
use core::num::NonZeroUsize;
@@ -47,7 +49,9 @@ impl<CS: ConcurrentStream> ConcurrentStream for Enumerate<CS> {
4749
}
4850
}
4951

52+
#[pin_project]
5053
struct EnumerateConsumer<C> {
54+
#[pin]
5155
inner: C,
5256
count: usize,
5357
}
@@ -58,18 +62,21 @@ where
5862
{
5963
type Output = C::Output;
6064

61-
async fn send(&mut self, future: Fut) -> super::ConsumerState {
62-
let count = self.count;
63-
self.count += 1;
64-
self.inner.send(EnumerateFuture::new(future, count)).await
65+
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
66+
let this = self.project();
67+
let count = *this.count;
68+
*this.count += 1;
69+
this.inner.send(EnumerateFuture::new(future, count)).await
6570
}
6671

67-
async fn progress(&mut self) -> super::ConsumerState {
68-
self.inner.progress().await
72+
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
73+
let this = self.project();
74+
this.inner.progress().await
6975
}
7076

71-
async fn flush(&mut self) -> Self::Output {
72-
self.inner.flush().await
77+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
78+
let this = self.project();
79+
this.inner.flush().await
7380
}
7481
}
7582

src/concurrent_stream/for_each.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ where
2323
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
2424
count: Arc<AtomicUsize>,
2525
#[pin]
26-
group: Pin<Box<FutureGroup<ForEachFut<F, FutT, T, FutB>>>>,
26+
group: FutureGroup<ForEachFut<F, FutT, T, FutB>>,
2727
limit: usize,
2828
f: F,
2929
_phantom: PhantomData<(T, FutB)>,
@@ -45,7 +45,7 @@ where
4545
f,
4646
_phantom: PhantomData,
4747
count: Arc::new(AtomicUsize::new(0)),
48-
group: Box::pin(FutureGroup::new()),
48+
group: FutureGroup::new(),
4949
}
5050
}
5151
}
@@ -60,30 +60,33 @@ where
6060
{
6161
type Output = ();
6262

63-
async fn send(&mut self, future: FutT) -> super::ConsumerState {
63+
async fn send(mut self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
64+
let mut this = self.project();
6465
// If we have no space, we're going to provide backpressure until we have space
65-
while self.count.load(Ordering::Relaxed) >= self.limit {
66-
self.group.next().await;
66+
while this.count.load(Ordering::Relaxed) >= *this.limit {
67+
this.group.next().await;
6768
}
6869

6970
// Space was available! - insert the item for posterity
70-
self.count.fetch_add(1, Ordering::Relaxed);
71-
let fut = ForEachFut::new(self.f.clone(), future, self.count.clone());
72-
self.group.as_mut().insert_pinned(fut);
71+
this.count.fetch_add(1, Ordering::Relaxed);
72+
let fut = ForEachFut::new(this.f.clone(), future, this.count.clone());
73+
this.group.as_mut().insert_pinned(fut);
7374

7475
ConsumerState::Continue
7576
}
7677

77-
async fn progress(&mut self) -> super::ConsumerState {
78-
while let Some(_) = self.group.next().await {}
78+
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
79+
let mut this = self.project();
80+
while let Some(_) = this.group.next().await {}
7981
ConsumerState::Empty
8082
}
8183

82-
async fn flush(&mut self) -> Self::Output {
84+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
85+
let mut this = self.project();
8386
// 4. We will no longer receive any additional futures from the
8487
// underlying stream; wait until all the futures in the group have
8588
// resolved.
86-
while let Some(_) = self.group.next().await {}
89+
while let Some(_) = this.group.next().await {}
8790
}
8891
}
8992

src/concurrent_stream/from_concurrent_stream.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use alloc::vec::Vec;
55
use core::future::Future;
66
use core::pin::Pin;
77
use futures_lite::StreamExt;
8+
use pin_project::pin_project;
89

910
/// Conversion from a [`ConcurrentStream`]
1011
#[allow(async_fn_in_trait)]
@@ -28,15 +29,17 @@ impl<T> FromConcurrentStream<T> for Vec<T> {
2829
}
2930

3031
// TODO: replace this with a generalized `fold` operation
32+
#[pin_project]
3133
pub(crate) struct VecConsumer<'a, Fut: Future> {
32-
group: Pin<Box<FutureGroup<Fut>>>,
34+
#[pin]
35+
group: FutureGroup<Fut>,
3336
output: &'a mut Vec<Fut::Output>,
3437
}
3538

3639
impl<'a, Fut: Future> VecConsumer<'a, Fut> {
3740
pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {
3841
Self {
39-
group: Box::pin(FutureGroup::new()),
42+
group: FutureGroup::new(),
4043
output,
4144
}
4245
}
@@ -48,21 +51,24 @@ where
4851
{
4952
type Output = ();
5053

51-
async fn send(&mut self, future: Fut) -> super::ConsumerState {
54+
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
55+
let mut this = self.project();
5256
// unbounded concurrency, so we just goooo
53-
self.group.as_mut().insert_pinned(future);
57+
this.group.as_mut().insert_pinned(future);
5458
ConsumerState::Continue
5559
}
5660

57-
async fn progress(&mut self) -> super::ConsumerState {
58-
while let Some(item) = self.group.next().await {
59-
self.output.push(item);
61+
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
62+
let mut this = self.project();
63+
while let Some(item) = this.group.next().await {
64+
this.output.push(item);
6065
}
6166
ConsumerState::Empty
6267
}
63-
async fn flush(&mut self) -> Self::Output {
64-
while let Some(item) = self.group.next().await {
65-
self.output.push(item);
68+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
69+
let mut this = self.project();
70+
while let Some(item) = this.group.next().await {
71+
this.output.push(item);
6672
}
6773
}
6874
}

src/concurrent_stream/limit.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use pin_project::pin_project;
2+
13
use super::{ConcurrentStream, Consumer};
24
use core::future::Future;
35
use core::num::NonZeroUsize;
6+
use core::pin::Pin;
47

58
/// A concurrent iterator that limits the amount of concurrency applied.
69
///
@@ -43,7 +46,9 @@ impl<CS: ConcurrentStream> ConcurrentStream for Limit<CS> {
4346
}
4447
}
4548

49+
#[pin_project]
4650
struct LimitConsumer<C> {
51+
#[pin]
4752
inner: C,
4853
}
4954
impl<C, Item, Fut> Consumer<Item, Fut> for LimitConsumer<C>
@@ -53,15 +58,18 @@ where
5358
{
5459
type Output = C::Output;
5560

56-
async fn send(&mut self, future: Fut) -> super::ConsumerState {
57-
self.inner.send(future).await
61+
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
62+
let this = self.project();
63+
this.inner.send(future).await
5864
}
5965

60-
async fn progress(&mut self) -> super::ConsumerState {
61-
self.inner.progress().await
66+
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
67+
let this = self.project();
68+
this.inner.progress().await
6269
}
6370

64-
async fn flush(&mut self) -> Self::Output {
65-
self.inner.flush().await
71+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
72+
let this = self.project();
73+
this.inner.flush().await
6674
}
6775
}

src/concurrent_stream/map.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use pin_project::pin_project;
2+
13
use super::{ConcurrentStream, Consumer};
24
use core::num::NonZeroUsize;
35
use core::{
@@ -71,7 +73,7 @@ where
7173
}
7274
}
7375

74-
// OK: validated! - all bounds should check out
76+
#[pin_project]
7577
pub struct MapConsumer<C, F, FutT, T, FutB, B>
7678
where
7779
FutT: Future<Output = T>,
@@ -80,6 +82,7 @@ where
8082
F: Clone,
8183
FutB: Future<Output = B>,
8284
{
85+
#[pin]
8386
inner: C,
8487
f: F,
8588
_phantom: PhantomData<(FutT, T, FutB, B)>,
@@ -95,17 +98,20 @@ where
9598
{
9699
type Output = C::Output;
97100

98-
async fn progress(&mut self) -> super::ConsumerState {
99-
self.inner.progress().await
101+
async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
102+
let this = self.project();
103+
this.inner.progress().await
100104
}
101105

102-
async fn send(&mut self, future: FutT) -> super::ConsumerState {
103-
let fut = MapFuture::new(self.f.clone(), future);
104-
self.inner.send(fut).await
106+
async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
107+
let this = self.project();
108+
let fut = MapFuture::new(this.f.clone(), future);
109+
this.inner.send(fut).await
105110
}
106111

107-
async fn flush(&mut self) -> Self::Output {
108-
self.inner.flush().await
112+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
113+
let this = self.project();
114+
this.inner.flush().await
109115
}
110116
}
111117

src/concurrent_stream/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod try_for_each;
1212

1313
use core::future::Future;
1414
use core::num::NonZeroUsize;
15+
use core::pin::Pin;
1516
use for_each::ForEachConsumer;
1617
use try_for_each::TryForEachConsumer;
1718

@@ -37,18 +38,18 @@ where
3738
type Output;
3839

3940
/// Send an item down to the next step in the processing queue.
40-
async fn send(&mut self, fut: Fut) -> ConsumerState;
41+
async fn send(self: Pin<&mut Self>, fut: Fut) -> ConsumerState;
4142

4243
/// Make progress on the consumer while doing something else.
4344
///
4445
/// It should always be possible to drop the future returned by this
4546
/// function. This is solely intended to keep work going on the `Consumer`
4647
/// while doing e.g. waiting for new futures from a stream.
47-
async fn progress(&mut self) -> ConsumerState;
48+
async fn progress(self: Pin<&mut Self>) -> ConsumerState;
4849

4950
/// We have no more data left to send to the `Consumer`; wait for its
5051
/// output.
51-
async fn flush(&mut self) -> Self::Output;
52+
async fn flush(self: Pin<&mut Self>) -> Self::Output;
5253
}
5354

5455
/// Concurrently operate over items in a stream

src/concurrent_stream/take.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use pin_project::pin_project;
2+
13
use super::{ConcurrentStream, Consumer, ConsumerState};
24
use core::future::Future;
35
use core::num::NonZeroUsize;
6+
use core::pin::Pin;
47

58
/// A concurrent iterator that only iterates over the first `n` iterations of `iter`.
69
///
@@ -49,7 +52,9 @@ impl<CS: ConcurrentStream> ConcurrentStream for Take<CS> {
4952
}
5053
}
5154

55+
#[pin_project]
5256
struct TakeConsumer<C> {
57+
#[pin]
5358
inner: C,
5459
count: usize,
5560
limit: usize,
@@ -61,22 +66,25 @@ where
6166
{
6267
type Output = C::Output;
6368

64-
async fn send(&mut self, future: Fut) -> ConsumerState {
65-
self.count += 1;
66-
let state = self.inner.send(future).await;
67-
if self.count >= self.limit {
69+
async fn send(self: Pin<&mut Self>, future: Fut) -> ConsumerState {
70+
let this = self.project();
71+
*this.count += 1;
72+
let state = this.inner.send(future).await;
73+
if this.count >= this.limit {
6874
ConsumerState::Break
6975
} else {
7076
state
7177
}
7278
}
7379

74-
async fn progress(&mut self) -> ConsumerState {
75-
self.inner.progress().await
80+
async fn progress(self: Pin<&mut Self>) -> ConsumerState {
81+
let this = self.project();
82+
this.inner.progress().await
7683
}
7784

78-
async fn flush(&mut self) -> Self::Output {
79-
self.inner.flush().await
85+
async fn flush(self: Pin<&mut Self>) -> Self::Output {
86+
let this = self.project();
87+
this.inner.flush().await
8088
}
8189
}
8290

0 commit comments

Comments
 (0)