diff --git a/lightning/src/lib.rs b/lightning/src/lib.rs index 5c608d9607a..5fd0b5e5076 100644 --- a/lightning/src/lib.rs +++ b/lightning/src/lib.rs @@ -30,7 +30,7 @@ //! * `grind_signatures` #![cfg_attr(not(any(test, fuzzing, feature = "_test_utils")), deny(missing_docs))] -#![cfg_attr(not(any(test, feature = "_test_utils")), forbid(unsafe_code))] +#![cfg_attr(not(any(test, feature = "_test_utils")), deny(unsafe_code))] #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 6009a276976..176a973bcdf 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -1576,7 +1576,7 @@ where /// /// See the trait-level documentation of [`EventsProvider`] for requirements. pub async fn process_pending_events_async< - Future: core::future::Future> + core::marker::Unpin, + Future: core::future::Future>, H: Fn(Event) -> Future, >( &self, handler: H, diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index c18ada73a47..dcb06867c70 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -9,68 +9,108 @@ //! Some utilities to make working with the standard library's [`Future`]s easier +// The `unsafe` in this module is required to support `!Unpin` futures without +// lots of extra unnecessary boxing. +#![allow(unsafe_code)] + use crate::prelude::*; use core::future::Future; use core::marker::Unpin; +use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -pub(crate) enum ResultFuture>, E: Copy + Unpin> { - Pending(F), +pub(crate) enum ResultFuture>, E: Unpin> { + Pending(/* #[pin] */ F), Ready(Result<(), E>), } -pub(crate) struct MultiResultFuturePoller< - F: Future> + Unpin, - E: Copy + Unpin, -> { - futures_state: Vec>, +/// A future that polls a set of futures concurrently and returns their results. +/// +/// This implementation is effectively `futures::future::join_all` with no "Big" +/// set optimization: +/// +pub(crate) struct MultiResultFuturePoller>, E: Unpin> { + // Use a pinned boxed slice instead of a Vec to make it harder to accidentally + // move the inner values. Someone could easily resize the Vec, thus moving all + // the inner values and violating the Pin contract. + futures_state: Pin]>>, } -impl> + Unpin, E: Copy + Unpin> MultiResultFuturePoller { +impl>, E: Unpin> MultiResultFuturePoller { pub fn new(futures_state: Vec>) -> Self { - Self { futures_state } + Self { futures_state: Box::into_pin(futures_state.into_boxed_slice()) } } } -impl> + Unpin, E: Copy + Unpin> Future - for MultiResultFuturePoller -{ +impl>, E: Unpin> Future for MultiResultFuturePoller { type Output = Vec>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let mut have_pending_futures = false; - let futures_state = &mut self.get_mut().futures_state; - for state in futures_state.iter_mut() { - match state { - ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) { - Poll::Ready(res) => { - *state = ResultFuture::Ready(res); - }, - Poll::Pending => { - have_pending_futures = true; - }, - }, - ResultFuture::Ready(_) => continue, + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let mut any_pending_futures = false; + let futures_state: &mut Pin> = &mut self.futures_state; + + // Poll all the inner futures in order. + for state in iter_pin_mut(futures_state.as_mut()) { + if state.poll(cx).is_pending() { + any_pending_futures = true; } } - if have_pending_futures { - Poll::Pending - } else { - let results = futures_state - .drain(..) - .filter_map(|e| match e { - ResultFuture::Ready(res) => Some(res), + if !any_pending_futures { + // Reuse the Box<[_]> allocation for the output Vec<_>. + let results: Pin> = mem::replace(futures_state, Box::pin([])); + // SAFETY: all the inner values are simple `Ready(Result<(), E>)` + // values, which are `Unpin`. + let results: Box<[_]> = unsafe { Pin::into_inner_unchecked(results) }; + let results = Vec::from(results); + + let result = results + .into_iter() + .map(|state| match state { + ResultFuture::Ready(res) => res, ResultFuture::Pending(_) => { - debug_assert!( - false, - "All futures are expected to be ready if none are pending" - ); - None + unreachable!("All futures are expected to be ready if none are pending") }, }) .collect(); - Poll::Ready(results) + + Poll::Ready(result) + } else { + Poll::Pending + } + } +} + +impl>, E: Unpin> Future for ResultFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // SAFETY: just a standard enum pin-project, which is safe as we don't + // move anything during Pin<&mut Self> -> &mut Self -> Pin<&mut F>. + let this: &mut Self = unsafe { self.as_mut().get_unchecked_mut() }; + match this { + ResultFuture::Pending(fut) => { + let fut: Pin<&mut F> = unsafe { Pin::new_unchecked(fut) }; + match fut.poll(cx) { + Poll::Ready(res) => { + self.set(ResultFuture::Ready(res)); + Poll::Ready(()) + }, + Poll::Pending => Poll::Pending, + } + }, + ResultFuture::Ready(_) => Poll::Ready(()), } } } + +// Pin project from a pinned mut slice into an iterator of pinned mut entries. +fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> { + // quoted from `futures::future::join_all`: + // > SAFETY: `std` _could_ make this unsound if it were to decide Pin's + // > invariants aren't required to transmit through slices. Otherwise this has + // > the same safety as a normal field pin projection. + let slice: &mut [T] = unsafe { Pin::get_unchecked_mut(slice) }; + slice.iter_mut().map(|x| unsafe { Pin::new_unchecked(x) }) +}