Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ pub trait UtxoSource: BlockSource + 'static {
pub struct TokioSpawner;
#[cfg(feature = "tokio")]
impl FutureSpawner for TokioSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
tokio::spawn(future);
type E = tokio::task::JoinError;
type SpawnedFutureResult<O> = tokio::task::JoinHandle<O>;
fn spawn<O: Send + 'static, F: Future<Output = O> + Send + 'static>(
&self, future: F,
) -> Self::SpawnedFutureResult<O> {
tokio::spawn(future)
}
}

Expand Down Expand Up @@ -273,7 +277,7 @@ where
let gossiper = Arc::clone(&self.gossiper);
let block_cache = Arc::clone(&self.block_cache);
let pmw = Arc::clone(&self.peer_manager_wake);
self.spawn.spawn(async move {
let _ = self.spawn.spawn(async move {
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
fut.resolve(gossiper.network_graph(), &*gossiper, res);
(pmw)();
Expand Down
88 changes: 78 additions & 10 deletions lightning/src/util/async_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,94 @@ use core::marker::Unpin;
use core::pin::Pin;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
pub(crate) enum ResultFuture<F: Future<Output = O> + Unpin, O> {
Pending(F),
Ready(Result<(), E>),
Ready(O),
}

pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
futures_state: Vec<ResultFuture<F, E>>,
pub(crate) struct TwoFutureJoiner<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> {
a: Option<ResultFuture<AF, AO>>,
b: Option<ResultFuture<BF, BO>>,
}

impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> TwoFutureJoiner<AO, BO, AF, BF> {
pub fn new(future_a: AF, future_b: BF) -> Self {
Self {
a: Some(ResultFuture::Pending(future_a)),
b: Some(ResultFuture::Pending(future_b)),
}
}
}

impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> Future for TwoFutureJoiner<AO, BO, AF, BF> {
type Output = (AO, BO);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(AO, BO)> {
let mut have_pending_futures = false;
// SAFETY: While we are pinned, we can't get direct access to our internal state because we
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
// reference directly.
let state = unsafe { &mut self.get_unchecked_mut() };
macro_rules! poll_future {
($future: ident) => {
match state.$future {
Some(ResultFuture::Pending(ref mut fut)) => match Pin::new(fut).poll(cx) {
Poll::Ready(res) => {
state.$future = Some(ResultFuture::Ready(res));
},
Poll::Pending => {
have_pending_futures = true;
},
},
Some(ResultFuture::Ready(_)) => {},
None => {
debug_assert!(false, "Future polled after Ready");
return Poll::Pending;
},
}
};
}
poll_future!(a);
poll_future!(b);

if have_pending_futures {
Poll::Pending
} else {
Poll::Ready((
match state.a.take() {
Some(ResultFuture::Ready(a)) => a,
_ => unreachable!(),
},
match state.b.take() {
Some(ResultFuture::Ready(b)) => b,
_ => unreachable!(),
}
))
}
}
}

pub(crate) struct MultiResultFuturePoller<F: Future<Output = O> + Unpin, O> {
futures_state: Vec<ResultFuture<F, O>>,
}

impl<F: Future<Output = O> + Unpin, O> MultiResultFuturePoller<F, O> {
pub fn new(futures_state: Vec<ResultFuture<F, O>>) -> Self {
Self { futures_state }
}
}

impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> Future for MultiResultFuturePoller<F, E> {
type Output = Vec<Result<(), E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
impl<F: Future<Output = O> + Unpin, O> Future for MultiResultFuturePoller<F, O> {
type Output = Vec<O>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<O>> {
let mut have_pending_futures = false;
let futures_state = &mut self.get_mut().futures_state;
// SAFETY: While we are pinned, we can't get direct access to `futures_state` because we
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
// reference directly.
let futures_state = unsafe { &mut self.get_unchecked_mut().futures_state };
for state in futures_state.iter_mut() {
match state {
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
Expand Down
112 changes: 106 additions & 6 deletions lightning/src/util/native_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,38 @@
//! environment.

#[cfg(all(test, feature = "std"))]
use crate::sync::Mutex;
use crate::sync::{Arc, Mutex};
use crate::util::async_poll::{MaybeSend, MaybeSync};

#[cfg(all(test, not(feature = "std")))]
use core::cell::RefCell;
use core::cell::{Rc, RefCell};
#[cfg(test)]
use core::convert::Infallible;
use core::future::Future;
#[cfg(test)]
use core::pin::Pin;
#[cfg(test)]
use core::task::{Context, Poll};

/// A generic trait which is able to spawn futures in the background.
/// A generic trait which is able to spawn futures to be polled in the background.
///
/// When the spawned future completes, the returned [`Self::SpawnedFutureResult`] should resolve
/// with the output of the spawned future.
///
/// Spawned futures must be polled independently in the background even if the returned
/// [`Self::SpawnedFutureResult`] is dropped without being polled. This matches the semantics of
/// `tokio::spawn`.
pub trait FutureSpawner: MaybeSend + MaybeSync + 'static {
/// The error type of [`Self::SpawnedFutureResult`].
type E;
/// The result of [`Self::spawn`], a future which completes when the spawned future completes.
type SpawnedFutureResult<O>: Future<Output = Result<O, Self::E>> + Unpin;
/// Spawns the given future as a background task.
///
/// This method MUST NOT block on the given future immediately.
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T);
fn spawn<O: MaybeSend + 'static, T: Future<Output = O> + MaybeSend + 'static>(
&self, future: T,
) -> Self::SpawnedFutureResult<O>;
}

#[cfg(test)]
Expand All @@ -37,6 +54,69 @@ pub(crate) struct FutureQueue(Mutex<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);
#[cfg(all(test, not(feature = "std")))]
pub(crate) struct FutureQueue(RefCell<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);

#[cfg(all(test, feature = "std"))]
pub struct FutureQueueCompletion<O>(Arc<Mutex<Option<O>>>);
#[cfg(all(test, not(feature = "std")))]
pub struct FutureQueueCompletion<O>(Rc<RefCell<Option<O>>>);

#[cfg(all(test, feature = "std"))]
impl<O> FutureQueueCompletion<O> {
fn new() -> Self {
Self(Arc::new(Mutex::new(None)))
}

fn complete(&self, o: O) {
*self.0.lock().unwrap() = Some(o);
}
}

#[cfg(all(test, feature = "std"))]
impl<O> Clone for FutureQueueCompletion<O> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

#[cfg(all(test, not(feature = "std")))]
impl<O> FutureQueueCompletion<O> {
fn new() -> Self {
Self(Rc::new(Mutex::new(None)))
}

fn complete(&self, o: O) {
*self.0.lock().unwrap() = Some(o);
}
}

#[cfg(all(test, not(feature = "std")))]
impl<O> Clone for FutureQueueCompletion<O> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

#[cfg(all(test, feature = "std"))]
impl<O> Future for FutureQueueCompletion<O> {
type Output = Result<O, Infallible>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<O, Infallible>> {
match Pin::into_inner(self).0.lock().unwrap().take() {
None => Poll::Pending,
Some(o) => Poll::Ready(Ok(o)),
}
}
}

#[cfg(all(test, not(feature = "std")))]
impl<O> Future for FutureQueueCompletion<O> {
type Output = Result<O, Infallible>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<O, Infallible>> {
match Pin::into_inner(self).0.get_mut().take() {
None => Poll::Pending,
Some(o) => Poll::Ready(Ok(o)),
}
}
}

#[cfg(test)]
impl FutureQueue {
pub(crate) fn new() -> Self {
Expand Down Expand Up @@ -84,7 +164,16 @@ impl FutureQueue {

#[cfg(test)]
impl FutureSpawner for FutureQueue {
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T) {
type E = Infallible;
type SpawnedFutureResult<O> = FutureQueueCompletion<O>;
fn spawn<O: MaybeSend + 'static, F: Future<Output = O> + MaybeSend + 'static>(
&self, f: F,
) -> FutureQueueCompletion<O> {
let completion = FutureQueueCompletion::new();
let compl_ref = completion.clone();
let future = async move {
compl_ref.complete(f.await);
};
#[cfg(feature = "std")]
{
self.0.lock().unwrap().push(Box::pin(future));
Expand All @@ -93,14 +182,24 @@ impl FutureSpawner for FutureQueue {
{
self.0.borrow_mut().push(Box::pin(future));
}
completion
}
}

#[cfg(test)]
impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static> FutureSpawner
for D
{
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T) {
type E = Infallible;
type SpawnedFutureResult<O> = FutureQueueCompletion<O>;
fn spawn<O: MaybeSend + 'static, F: Future<Output = O> + MaybeSend + 'static>(
&self, f: F,
) -> FutureQueueCompletion<O> {
let completion = FutureQueueCompletion::new();
let compl_ref = completion.clone();
let future = async move {
compl_ref.complete(f.await);
};
#[cfg(feature = "std")]
{
self.0.lock().unwrap().push(Box::pin(future));
Expand All @@ -109,5 +208,6 @@ impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static
{
self.0.borrow_mut().push(Box::pin(future));
}
completion
}
}
Loading
Loading