Skip to content

Commit ac9087c

Browse files
committed
stack-pin the FutureGroup
1 parent 745bb53 commit ac9087c

File tree

3 files changed

+6
-7
lines changed

3 files changed

+6
-7
lines changed

src/concurrent_stream/for_each.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use futures_lite::StreamExt;
33
use pin_project::pin_project;
44

55
use super::{Consumer, ConsumerState};
6-
use alloc::boxed::Box;
76
use alloc::sync::Arc;
87
use core::future::Future;
98
use core::marker::PhantomData;
@@ -60,7 +59,7 @@ where
6059
{
6160
type Output = ();
6261

63-
async fn send(mut self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
62+
async fn send(self: Pin<&mut Self>, future: FutT) -> super::ConsumerState {
6463
let mut this = self.project();
6564
// If we have no space, we're going to provide backpressure until we have space
6665
while this.count.load(Ordering::Relaxed) >= *this.limit {

src/concurrent_stream/from_concurrent_stream.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
22
use crate::future::FutureGroup;
3-
use alloc::boxed::Box;
43
use alloc::vec::Vec;
54
use core::future::Future;
65
use core::pin::Pin;

src/concurrent_stream/from_stream.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ where
3333
C: Consumer<Self::Item, Self::Future>,
3434
{
3535
let mut iter = pin!(self.stream);
36+
let mut consumer = pin!(consumer);
3637

3738
// Concurrently progress the consumer as well as the stream. Whenever
3839
// there is an item from the stream available, we submit it to the
@@ -53,7 +54,7 @@ where
5354

5455
// Drive the consumer forward
5556
let b = async {
56-
let control_flow = consumer.progress().await;
57+
let control_flow = consumer.as_mut().progress().await;
5758
State::Progress(control_flow)
5859
};
5960

@@ -64,14 +65,14 @@ where
6465
ConsumerState::Break => break,
6566
ConsumerState::Continue => continue,
6667
ConsumerState::Empty => match iter.next().await {
67-
Some(item) => match consumer.send(ready(item)).await {
68+
Some(item) => match consumer.as_mut().send(ready(item)).await {
6869
ConsumerState::Break => break,
6970
ConsumerState::Empty | ConsumerState::Continue => continue,
7071
},
7172
None => break,
7273
},
7374
},
74-
State::Item(Some(item)) => match consumer.send(ready(item)).await {
75+
State::Item(Some(item)) => match consumer.as_mut().send(ready(item)).await {
7576
ConsumerState::Break => break,
7677
ConsumerState::Empty | ConsumerState::Continue => continue,
7778
},
@@ -81,7 +82,7 @@ where
8182

8283
// We will no longer receive items from the underlying stream, which
8384
// means we're ready to wait for the consumer to finish up.
84-
consumer.flush().await
85+
consumer.as_mut().flush().await
8586
}
8687

8788
fn concurrency_limit(&self) -> Option<NonZeroUsize> {

0 commit comments

Comments
 (0)