Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//! * `grind_signatures`
#![cfg_attr(not(any(test, fuzzing, feature = "_test_utils")), deny(missing_docs))]
#![cfg_attr(not(any(test, feature = "_test_utils")), forbid(unsafe_code))]
#![cfg_attr(not(any(test, feature = "_test_utils")), deny(unsafe_code))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not convinced we want to go down this path. Could you expand on why boxing the event handler futures is that painful to you that it warrants these rewrites using unsafe code that introduces additional complexity and assumptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It would fix an irritating API inconsistency, where some process_events_async require Unpin and some don't.
  2. The unsafe code is relatively low risk and self-contained. I've been using the futures crate (which this impl is taken from) for years and never had an issue. It has 13M+ dl's on crates.io, etc.

Copy link
Contributor

@tnull tnull Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It would fix an irritating API inconsistency, where some process_events_async require Unpin and some don't.

Hmm, okay, so this is just about an API inconsistency? What is the impact it has on your code in practice?

  1. The unsafe code is relatively low risk and self-contained. I've been using the futures crate (which this impl is taken from) for years and never had an issue. It has 13M+ dl's on crates.io, etc.

Well, there are good reasons why avoid/forbade unsafe code. IIRC, you historically used to complain about the existence about some unsafe code in lightning-net-tokio, so what changed your opinion on the topic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there are good reasons why avoid/forbade unsafe code. IIRC, you historically used to complain about the existence about some unsafe code in lightning-net-tokio, so what changed your opinion on the topic?

100% and definitely a fair callout 😅. I guess I just trust the futures-rs impl that's run for however many billion CPU hours w/o issue to be functionally safe at this point.

Hmm, okay, so this is just about an API inconsistency? What is the impact it has on your code in practice?

Well it would have required me to box some of the handler futures but not others, which was mildly annoying and probably a very minor perf hit.

Anyway, I found a much simpler approach that doesn't require handling the OnionMessenger events async, so I don't actually need this anymore. I'll close the PR -- and thanks again for the review!


#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/onion_message/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ where
///
/// See the trait-level documentation of [`EventsProvider`] for requirements.
pub async fn process_pending_events_async<
Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin,
Future: core::future::Future<Output = Result<(), ReplayEvent>>,
H: Fn(Event) -> Future,
>(
&self, handler: H,
Expand Down
118 changes: 79 additions & 39 deletions lightning/src/util/async_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,108 @@

//! Some utilities to make working with the standard library's [`Future`]s easier

// The `unsafe` in this module is required to support `!Unpin` futures without
// lots of extra unnecessary boxing.
#![allow(unsafe_code)]

use crate::prelude::*;
use core::future::Future;
use core::marker::Unpin;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};

pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
Pending(F),
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
Pending(/* #[pin] */ F),
Ready(Result<(), E>),
}

pub(crate) struct MultiResultFuturePoller<
F: Future<Output = Result<(), E>> + Unpin,
E: Copy + Unpin,
> {
futures_state: Vec<ResultFuture<F, E>>,
/// A future that polls a set of futures concurrently and returns their results.
///
/// This implementation is effectively `futures::future::join_all` with no "Big"
/// set optimization:
/// <https://github.com/rust-lang/futures-rs/blob/6f9a15f6e30cb3a2a79aabb9386dfaf282ef174d/futures-util/src/future/join_all.rs>
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>>, E: Unpin> {
// Use a pinned boxed slice instead of a Vec to make it harder to accidentally
// move the inner values. Someone could easily resize the Vec, thus moving all
// the inner values and violating the Pin contract.
futures_state: Pin<Box<[ResultFuture<F, E>]>>,
}

impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> MultiResultFuturePoller<F, E> {
impl<F: Future<Output = Result<(), E>>, E: Unpin> MultiResultFuturePoller<F, E> {
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
Self { futures_state }
Self { futures_state: Box::into_pin(futures_state.into_boxed_slice()) }
}
}

impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
for MultiResultFuturePoller<F, E>
{
impl<F: Future<Output = Result<(), E>>, E: Unpin> Future for MultiResultFuturePoller<F, E> {
type Output = Vec<Result<(), E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
let mut have_pending_futures = false;
let futures_state = &mut self.get_mut().futures_state;
for state in futures_state.iter_mut() {
match state {
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Ready(res) => {
*state = ResultFuture::Ready(res);
},
Poll::Pending => {
have_pending_futures = true;
},
},
ResultFuture::Ready(_) => continue,

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
let mut any_pending_futures = false;
let futures_state: &mut Pin<Box<[_]>> = &mut self.futures_state;

// Poll all the inner futures in order.
for state in iter_pin_mut(futures_state.as_mut()) {
if state.poll(cx).is_pending() {
any_pending_futures = true;
}
}

if have_pending_futures {
Poll::Pending
} else {
let results = futures_state
.drain(..)
.filter_map(|e| match e {
ResultFuture::Ready(res) => Some(res),
if !any_pending_futures {
// Reuse the Box<[_]> allocation for the output Vec<_>.
let results: Pin<Box<[_]>> = mem::replace(futures_state, Box::pin([]));
// SAFETY: all the inner values are simple `Ready(Result<(), E>)`
// values, which are `Unpin`.
let results: Box<[_]> = unsafe { Pin::into_inner_unchecked(results) };
let results = Vec::from(results);

let result = results
.into_iter()
.map(|state| match state {
ResultFuture::Ready(res) => res,
ResultFuture::Pending(_) => {
debug_assert!(
false,
"All futures are expected to be ready if none are pending"
);
None
unreachable!("All futures are expected to be ready if none are pending")
},
})
.collect();
Poll::Ready(results)

Poll::Ready(result)
} else {
Poll::Pending
}
}
}

impl<F: Future<Output = Result<(), E>>, E: Unpin> Future for ResultFuture<F, E> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: just a standard enum pin-project, which is safe as we don't
// move anything during Pin<&mut Self> -> &mut Self -> Pin<&mut F>.
let this: &mut Self = unsafe { self.as_mut().get_unchecked_mut() };
match this {
ResultFuture::Pending(fut) => {
let fut: Pin<&mut F> = unsafe { Pin::new_unchecked(fut) };
match fut.poll(cx) {
Poll::Ready(res) => {
self.set(ResultFuture::Ready(res));
Poll::Ready(())
},
Poll::Pending => Poll::Pending,
}
},
ResultFuture::Ready(_) => Poll::Ready(()),
}
}
}

// Pin project from a pinned mut slice into an iterator of pinned mut entries.
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// quoted from `futures::future::join_all`:
// > SAFETY: `std` _could_ make this unsound if it were to decide Pin's
// > invariants aren't required to transmit through slices. Otherwise this has
// > the same safety as a normal field pin projection.
let slice: &mut [T] = unsafe { Pin::get_unchecked_mut(slice) };
slice.iter_mut().map(|x| unsafe { Pin::new_unchecked(x) })
}
Loading