Skip to content

Commit 166e70b

Browse files
committed
fix pin violation in much of concurrentstream
1 parent 629ddf3 commit 166e70b

File tree

5 files changed

+13
-43
lines changed

5 files changed

+13
-43
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ futures-lite = "1.12.0"
3838
pin-project = "1.0.8"
3939
slab = { version = "0.4.8", optional = true }
4040
smallvec = { version = "1.11.0", optional = true }
41+
futures-buffered = "0.2.6"
4142

4243
[dev-dependencies]
4344
async-io = "2.3.2"

src/concurrent_stream/for_each.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{Consumer, ConsumerState};
2-
use crate::future::FutureGroup;
2+
use futures_buffered::FuturesUnordered;
33
use futures_lite::StreamExt;
44
use pin_project::pin_project;
55

@@ -22,7 +22,7 @@ where
2222
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
2323
count: Arc<AtomicUsize>,
2424
#[pin]
25-
group: FutureGroup<ForEachFut<F, FutT, T, FutB>>,
25+
group: FuturesUnordered<ForEachFut<F, FutT, T, FutB>>,
2626
limit: usize,
2727
f: F,
2828
_phantom: PhantomData<(T, FutB)>,
@@ -44,7 +44,7 @@ where
4444
f,
4545
_phantom: PhantomData,
4646
count: Arc::new(AtomicUsize::new(0)),
47-
group: FutureGroup::new(),
47+
group: FuturesUnordered::new(),
4848
}
4949
}
5050
}
@@ -69,7 +69,7 @@ where
6969
// Space was available! - insert the item for posterity
7070
this.count.fetch_add(1, Ordering::Relaxed);
7171
let fut = ForEachFut::new(this.f.clone(), future, this.count.clone());
72-
this.group.as_mut().insert_pinned(fut);
72+
this.group.as_mut().push(fut);
7373

7474
ConsumerState::Continue
7575
}

src/concurrent_stream/from_concurrent_stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
2-
use crate::future::FutureGroup;
2+
use futures_buffered::FuturesUnordered;
33
#[cfg(all(feature = "alloc", not(feature = "std")))]
44
use alloc::vec::Vec;
55
use core::future::Future;
@@ -32,14 +32,14 @@ impl<T> FromConcurrentStream<T> for Vec<T> {
3232
#[pin_project]
3333
pub(crate) struct VecConsumer<'a, Fut: Future> {
3434
#[pin]
35-
group: FutureGroup<Fut>,
35+
group: FuturesUnordered<Fut>,
3636
output: &'a mut Vec<Fut::Output>,
3737
}
3838

3939
impl<'a, Fut: Future> VecConsumer<'a, Fut> {
4040
pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {
4141
Self {
42-
group: FutureGroup::new(),
42+
group: FuturesUnordered::new(),
4343
output,
4444
}
4545
}
@@ -54,7 +54,7 @@ where
5454
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
5555
let mut this = self.project();
5656
// unbounded concurrency, so we just goooo
57-
this.group.as_mut().insert_pinned(future);
57+
this.group.as_mut().push(future);
5858
ConsumerState::Continue
5959
}
6060

src/concurrent_stream/try_for_each.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::concurrent_stream::ConsumerState;
2-
use crate::future::FutureGroup;
2+
use futures_buffered::FuturesUnordered;
33
use crate::private::Try;
44
use futures_lite::StreamExt;
55
use pin_project::pin_project;
@@ -26,7 +26,7 @@ where
2626
count: Arc<AtomicUsize>,
2727
// TODO: remove the `Pin<Box>` from this signature by requiring this struct is pinned
2828
#[pin]
29-
group: FutureGroup<TryForEachFut<F, FutT, T, FutB, B>>,
29+
group: FuturesUnordered<TryForEachFut<F, FutT, T, FutB, B>>,
3030
limit: usize,
3131
residual: Option<B::Residual>,
3232
f: F,
@@ -50,7 +50,7 @@ where
5050
f,
5151
residual: None,
5252
count: Arc::new(AtomicUsize::new(0)),
53-
group: FutureGroup::new(),
53+
group: FuturesUnordered::new(),
5454
_phantom: PhantomData,
5555
}
5656
}
@@ -93,7 +93,7 @@ where
9393
// Space was available! - insert the item for posterity
9494
this.count.fetch_add(1, Ordering::Relaxed);
9595
let fut = TryForEachFut::new(this.f.clone(), future, this.count.clone());
96-
this.group.as_mut().insert_pinned(fut);
96+
this.group.as_mut().push(fut);
9797
ConsumerState::Continue
9898
}
9999

src/future/future_group.rs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -274,37 +274,6 @@ impl<F: Future> FutureGroup<F> {
274274
Key(index)
275275
}
276276

277-
/// Insert a value into a pinned `FutureGroup`
278-
///
279-
/// This method is private because it serves as an implementation detail for
280-
/// `ConcurrentStream`. We should never expose this publicly, as the entire
281-
/// point of this crate is that we abstract the futures poll machinery away
282-
/// from end-users.
283-
pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key
284-
where
285-
F: Future,
286-
{
287-
let mut this = self.project();
288-
// SAFETY: inserting a value into the futures slab does not ever move
289-
// any of the existing values.
290-
let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future);
291-
this.keys.insert(index);
292-
let key = Key(index);
293-
294-
// If our slab allocated more space we need to
295-
// update our tracking structures along with it.
296-
let max_len = this.futures.as_ref().capacity().max(index);
297-
this.wakers.resize(max_len);
298-
this.states.resize(max_len);
299-
300-
// Set the corresponding state
301-
this.states[index].set_pending();
302-
let mut readiness = this.wakers.readiness();
303-
readiness.set_ready(index);
304-
305-
key
306-
}
307-
308277
/// Create a stream which also yields the key of each item.
309278
///
310279
/// # Example

0 commit comments

Comments
 (0)