|
9 | 9 |
|
10 | 10 | //! Some utilities to make working with the standard library's [`Future`]s easier |
11 | 11 |
|
| 12 | +// The `unsafe` in this module is required to support `!Unpin` futures without |
| 13 | +// lots of extra unnecessary boxing. |
| 14 | +#![allow(unsafe_code)] |
| 15 | + |
12 | 16 | use crate::prelude::*; |
13 | 17 | use core::future::Future; |
14 | 18 | use core::marker::Unpin; |
| 19 | +use core::mem; |
15 | 20 | use core::pin::Pin; |
16 | 21 | use core::task::{Context, Poll}; |
17 | 22 |
|
18 | | -pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> { |
19 | | - Pending(F), |
| 23 | +pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> { |
| 24 | + Pending(/* #[pin] */ F), |
20 | 25 | Ready(Result<(), E>), |
21 | 26 | } |
22 | 27 |
|
23 | | -pub(crate) struct MultiResultFuturePoller< |
24 | | - F: Future<Output = Result<(), E>> + Unpin, |
25 | | - E: Copy + Unpin, |
26 | | -> { |
27 | | - futures_state: Vec<ResultFuture<F, E>>, |
| 28 | +/// A future that polls a set of futures concurrently and returns their results. |
| 29 | +/// |
| 30 | +/// This implementation is effectively `futures::future::join_all` with no "Big" |
| 31 | +/// set optimization: |
| 32 | +/// <https://github.com/rust-lang/futures-rs/blob/6f9a15f6e30cb3a2a79aabb9386dfaf282ef174d/futures-util/src/future/join_all.rs> |
| 33 | +pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>>, E: Unpin> { |
| 34 | + // Use a pinned boxed slice instead of a Vec to make it harder to accidentally |
| 35 | + // move the inner values. Someone could easily resize the Vec, thus moving all |
| 36 | + // the inner values and violating the Pin contract. |
| 37 | + futures_state: Pin<Box<[ResultFuture<F, E>]>>, |
28 | 38 | } |
29 | 39 |
|
30 | | -impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> MultiResultFuturePoller<F, E> { |
| 40 | +impl<F: Future<Output = Result<(), E>>, E: Unpin> MultiResultFuturePoller<F, E> { |
31 | 41 | pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self { |
32 | | - Self { futures_state } |
| 42 | + Self { futures_state: Box::into_pin(futures_state.into_boxed_slice()) } |
33 | 43 | } |
34 | 44 | } |
35 | 45 |
|
36 | | -impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future |
37 | | - for MultiResultFuturePoller<F, E> |
38 | | -{ |
| 46 | +impl<F: Future<Output = Result<(), E>>, E: Unpin> Future for MultiResultFuturePoller<F, E> { |
39 | 47 | type Output = Vec<Result<(), E>>; |
40 | | - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> { |
41 | | - let mut have_pending_futures = false; |
42 | | - let futures_state = &mut self.get_mut().futures_state; |
43 | | - for state in futures_state.iter_mut() { |
44 | | - match state { |
45 | | - ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) { |
46 | | - Poll::Ready(res) => { |
47 | | - *state = ResultFuture::Ready(res); |
48 | | - }, |
49 | | - Poll::Pending => { |
50 | | - have_pending_futures = true; |
51 | | - }, |
52 | | - }, |
53 | | - ResultFuture::Ready(_) => continue, |
| 48 | + |
| 49 | + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> { |
| 50 | + let mut any_pending_futures = false; |
| 51 | + let futures_state: &mut Pin<Box<[_]>> = &mut self.futures_state; |
| 52 | + |
| 53 | + // Poll all the inner futures in order. |
| 54 | + for state in iter_pin_mut(futures_state.as_mut()) { |
| 55 | + if state.poll(cx).is_pending() { |
| 56 | + any_pending_futures = true; |
54 | 57 | } |
55 | 58 | } |
56 | 59 |
|
57 | | - if have_pending_futures { |
58 | | - Poll::Pending |
59 | | - } else { |
60 | | - let results = futures_state |
61 | | - .drain(..) |
62 | | - .filter_map(|e| match e { |
63 | | - ResultFuture::Ready(res) => Some(res), |
| 60 | + if !any_pending_futures { |
| 61 | + // Reuse the Box<[_]> allocation for the output Vec<_>. |
| 62 | + let results: Pin<Box<[_]>> = mem::replace(futures_state, Box::pin([])); |
| 63 | + // SAFETY: all the inner values are simple `Ready(Result<(), E>)` |
| 64 | + // values, which are `Unpin`. |
| 65 | + let results: Box<[_]> = unsafe { Pin::into_inner_unchecked(results) }; |
| 66 | + let results = Vec::from(results); |
| 67 | + |
| 68 | + let result = results |
| 69 | + .into_iter() |
| 70 | + .map(|state| match state { |
| 71 | + ResultFuture::Ready(res) => res, |
64 | 72 | ResultFuture::Pending(_) => { |
65 | | - debug_assert!( |
66 | | - false, |
67 | | - "All futures are expected to be ready if none are pending" |
68 | | - ); |
69 | | - None |
| 73 | + unreachable!("All futures are expected to be ready if none are pending") |
70 | 74 | }, |
71 | 75 | }) |
72 | 76 | .collect(); |
73 | | - Poll::Ready(results) |
| 77 | + |
| 78 | + Poll::Ready(result) |
| 79 | + } else { |
| 80 | + Poll::Pending |
| 81 | + } |
| 82 | + } |
| 83 | +} |
| 84 | + |
| 85 | +impl<F: Future<Output = Result<(), E>>, E: Unpin> Future for ResultFuture<F, E> { |
| 86 | + type Output = (); |
| 87 | + |
| 88 | + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 89 | + // SAFETY: just a standard enum pin-project, which is safe as we don't |
| 90 | + // move anything during Pin<&mut Self> -> &mut Self -> Pin<&mut F>. |
| 91 | + let this: &mut Self = unsafe { self.as_mut().get_unchecked_mut() }; |
| 92 | + match this { |
| 93 | + ResultFuture::Pending(fut) => { |
| 94 | + let fut: Pin<&mut F> = unsafe { Pin::new_unchecked(fut) }; |
| 95 | + match fut.poll(cx) { |
| 96 | + Poll::Ready(res) => { |
| 97 | + self.set(ResultFuture::Ready(res)); |
| 98 | + Poll::Ready(()) |
| 99 | + }, |
| 100 | + Poll::Pending => Poll::Pending, |
| 101 | + } |
| 102 | + }, |
| 103 | + ResultFuture::Ready(_) => Poll::Ready(()), |
74 | 104 | } |
75 | 105 | } |
76 | 106 | } |
| 107 | + |
| 108 | +// Pin project from a pinned mut slice into an iterator of pinned mut entries. |
| 109 | +fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { |
| 110 | + // quoted from `futures::future::join_all`: |
| 111 | + // > SAFETY: `std` _could_ make this unsound if it were to decide Pin's |
| 112 | + // > invariants aren't required to transmit through slices. Otherwise this has |
| 113 | + // > the same safety as a normal field pin projection. |
| 114 | + let slice: &mut [T] = unsafe { Pin::get_unchecked_mut(slice) }; |
| 115 | + slice.iter_mut().map(|x| unsafe { Pin::new_unchecked(x) }) |
| 116 | +} |
0 commit comments