Skip to content

Commit c9f022b

Browse files
authored
Merge pull request #4147 from TheBlueMatt/2025-10-parallel-reads
Parallelize `ChannelMonitor` loading from async `KVStore`s
2 parents f1755b1 + 6ef22ba commit c9f022b

File tree

4 files changed

+304
-72
lines changed

4 files changed

+304
-72
lines changed

lightning-block-sync/src/gossip.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,12 @@ pub trait UtxoSource: BlockSource + 'static {
4747
pub struct TokioSpawner;
4848
#[cfg(feature = "tokio")]
4949
impl FutureSpawner for TokioSpawner {
50-
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
51-
tokio::spawn(future);
50+
type E = tokio::task::JoinError;
51+
type SpawnedFutureResult<O> = tokio::task::JoinHandle<O>;
52+
fn spawn<O: Send + 'static, F: Future<Output = O> + Send + 'static>(
53+
&self, future: F,
54+
) -> Self::SpawnedFutureResult<O> {
55+
tokio::spawn(future)
5256
}
5357
}
5458

@@ -254,7 +258,7 @@ where
254258
let fut = res.clone();
255259
let source = self.source.clone();
256260
let block_cache = Arc::clone(&self.block_cache);
257-
self.spawn.spawn(async move {
261+
let _not_polled = self.spawn.spawn(async move {
258262
let res = Self::retrieve_utxo(source, block_cache, scid).await;
259263
fut.resolve(res);
260264
});

lightning/src/util/async_poll.rs

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,100 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

18-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
18+
pub(crate) enum ResultFuture<F: Future<Output = O> + Unpin, O> {
1919
Pending(F),
20-
Ready(Result<(), E>),
20+
Ready(O),
2121
}
2222

23-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
24-
futures_state: Vec<ResultFuture<F, E>>,
23+
pub(crate) struct TwoFutureJoiner<
24+
AO,
25+
BO,
26+
AF: Future<Output = AO> + Unpin,
27+
BF: Future<Output = BO> + Unpin,
28+
> {
29+
a: Option<ResultFuture<AF, AO>>,
30+
b: Option<ResultFuture<BF, BO>>,
2531
}
2632

27-
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
28-
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
33+
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin>
34+
TwoFutureJoiner<AO, BO, AF, BF>
35+
{
36+
pub fn new(future_a: AF, future_b: BF) -> Self {
37+
Self { a: Some(ResultFuture::Pending(future_a)), b: Some(ResultFuture::Pending(future_b)) }
38+
}
39+
}
40+
41+
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> Future
42+
for TwoFutureJoiner<AO, BO, AF, BF>
43+
{
44+
type Output = (AO, BO);
45+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(AO, BO)> {
46+
let mut have_pending_futures = false;
47+
// SAFETY: While we are pinned, we can't get direct access to our internal state because we
48+
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
49+
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
50+
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
51+
// reference directly.
52+
let state = unsafe { &mut self.get_unchecked_mut() };
53+
macro_rules! poll_future {
54+
($future: ident) => {
55+
match state.$future {
56+
Some(ResultFuture::Pending(ref mut fut)) => match Pin::new(fut).poll(cx) {
57+
Poll::Ready(res) => {
58+
state.$future = Some(ResultFuture::Ready(res));
59+
},
60+
Poll::Pending => {
61+
have_pending_futures = true;
62+
},
63+
},
64+
Some(ResultFuture::Ready(_)) => {},
65+
None => {
66+
debug_assert!(false, "Future polled after Ready");
67+
return Poll::Pending;
68+
},
69+
}
70+
};
71+
}
72+
poll_future!(a);
73+
poll_future!(b);
74+
75+
if have_pending_futures {
76+
Poll::Pending
77+
} else {
78+
Poll::Ready((
79+
match state.a.take() {
80+
Some(ResultFuture::Ready(a)) => a,
81+
_ => unreachable!(),
82+
},
83+
match state.b.take() {
84+
Some(ResultFuture::Ready(b)) => b,
85+
_ => unreachable!(),
86+
},
87+
))
88+
}
89+
}
90+
}
91+
92+
pub(crate) struct MultiResultFuturePoller<F: Future<Output = O> + Unpin, O> {
93+
futures_state: Vec<ResultFuture<F, O>>,
94+
}
95+
96+
impl<F: Future<Output = O> + Unpin, O> MultiResultFuturePoller<F, O> {
97+
pub fn new(futures_state: Vec<ResultFuture<F, O>>) -> Self {
2998
Self { futures_state }
3099
}
31100
}
32101

33-
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> Future for MultiResultFuturePoller<F, E> {
34-
type Output = Vec<Result<(), E>>;
35-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
102+
impl<F: Future<Output = O> + Unpin, O> Future for MultiResultFuturePoller<F, O> {
103+
type Output = Vec<O>;
104+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<O>> {
36105
let mut have_pending_futures = false;
37-
let futures_state = &mut self.get_mut().futures_state;
106+
// SAFETY: While we are pinned, we can't get direct access to `futures_state` because we
107+
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
108+
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
109+
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
110+
// reference directly.
111+
let futures_state = unsafe { &mut self.get_unchecked_mut().futures_state };
38112
for state in futures_state.iter_mut() {
39113
match state {
40114
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {

lightning/src/util/native_async.rs

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,44 @@
88
//! environment.
99
1010
#[cfg(all(test, feature = "std"))]
11-
use crate::sync::Mutex;
11+
use crate::sync::{Arc, Mutex};
1212
use crate::util::async_poll::{MaybeSend, MaybeSync};
1313

14+
#[cfg(all(test, not(feature = "std")))]
15+
use alloc::rc::Rc;
16+
1417
#[cfg(all(test, not(feature = "std")))]
1518
use core::cell::RefCell;
19+
#[cfg(test)]
20+
use core::convert::Infallible;
1621
use core::future::Future;
1722
#[cfg(test)]
1823
use core::pin::Pin;
24+
#[cfg(test)]
25+
use core::task::{Context, Poll};
1926

20-
/// A generic trait which is able to spawn futures in the background.
27+
/// A generic trait which is able to spawn futures to be polled in the background.
28+
///
29+
/// When the spawned future completes, the returned [`Self::SpawnedFutureResult`] should resolve
30+
/// with the output of the spawned future.
31+
///
32+
/// Spawned futures must be polled independently in the background even if the returned
33+
/// [`Self::SpawnedFutureResult`] is dropped without being polled. This matches the semantics of
34+
/// `tokio::spawn`.
2135
///
2236
/// This is not exported to bindings users as async is only supported in Rust.
2337
pub trait FutureSpawner: MaybeSend + MaybeSync + 'static {
38+
/// The error type of [`Self::SpawnedFutureResult`]. This can be used to indicate that the
39+
/// spawned future was cancelled or panicked.
40+
type E;
41+
/// The result of [`Self::spawn`], a future which completes when the spawned future completes.
42+
type SpawnedFutureResult<O>: Future<Output = Result<O, Self::E>> + Unpin;
2443
/// Spawns the given future as a background task.
2544
///
2645
/// This method MUST NOT block on the given future immediately.
27-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T);
46+
fn spawn<O: MaybeSend + 'static, T: Future<Output = O> + MaybeSend + 'static>(
47+
&self, future: T,
48+
) -> Self::SpawnedFutureResult<O>;
2849
}
2950

3051
#[cfg(test)]
@@ -39,6 +60,77 @@ pub(crate) struct FutureQueue(Mutex<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);
3960
#[cfg(all(test, not(feature = "std")))]
4061
pub(crate) struct FutureQueue(RefCell<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);
4162

63+
/// A simple future which can be completed later. Used to implement [`FutureQueue`].
64+
#[cfg(all(test, feature = "std"))]
65+
pub struct FutureQueueCompletion<O>(Arc<Mutex<Option<O>>>);
66+
#[cfg(all(test, not(feature = "std")))]
67+
pub struct FutureQueueCompletion<O>(Rc<RefCell<Option<O>>>);
68+
69+
#[cfg(all(test, feature = "std"))]
70+
impl<O> FutureQueueCompletion<O> {
71+
fn new() -> Self {
72+
Self(Arc::new(Mutex::new(None)))
73+
}
74+
75+
fn complete(&self, o: O) {
76+
*self.0.lock().unwrap() = Some(o);
77+
}
78+
}
79+
80+
#[cfg(all(test, feature = "std"))]
81+
impl<O> Clone for FutureQueueCompletion<O> {
82+
fn clone(&self) -> Self {
83+
#[cfg(all(test, feature = "std"))]
84+
{
85+
Self(Arc::clone(&self.0))
86+
}
87+
#[cfg(all(test, not(feature = "std")))]
88+
{
89+
Self(Rc::clone(&self.0))
90+
}
91+
}
92+
}
93+
94+
#[cfg(all(test, not(feature = "std")))]
95+
impl<O> FutureQueueCompletion<O> {
96+
fn new() -> Self {
97+
Self(Rc::new(RefCell::new(None)))
98+
}
99+
100+
fn complete(&self, o: O) {
101+
*self.0.borrow_mut() = Some(o);
102+
}
103+
}
104+
105+
#[cfg(all(test, not(feature = "std")))]
106+
impl<O> Clone for FutureQueueCompletion<O> {
107+
fn clone(&self) -> Self {
108+
Self(self.0.clone())
109+
}
110+
}
111+
112+
#[cfg(all(test, feature = "std"))]
113+
impl<O> Future for FutureQueueCompletion<O> {
114+
type Output = Result<O, Infallible>;
115+
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<O, Infallible>> {
116+
match Pin::into_inner(self).0.lock().unwrap().take() {
117+
None => Poll::Pending,
118+
Some(o) => Poll::Ready(Ok(o)),
119+
}
120+
}
121+
}
122+
123+
#[cfg(all(test, not(feature = "std")))]
124+
impl<O> Future for FutureQueueCompletion<O> {
125+
type Output = Result<O, Infallible>;
126+
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<O, Infallible>> {
127+
match Pin::into_inner(self).0.borrow_mut().take() {
128+
None => Poll::Pending,
129+
Some(o) => Poll::Ready(Ok(o)),
130+
}
131+
}
132+
}
133+
42134
#[cfg(test)]
43135
impl FutureQueue {
44136
pub(crate) fn new() -> Self {
@@ -74,7 +166,6 @@ impl FutureQueue {
74166
futures = self.0.borrow_mut();
75167
}
76168
futures.retain_mut(|fut| {
77-
use core::task::{Context, Poll};
78169
let waker = crate::util::async_poll::dummy_waker();
79170
match fut.as_mut().poll(&mut Context::from_waker(&waker)) {
80171
Poll::Ready(()) => false,
@@ -86,7 +177,16 @@ impl FutureQueue {
86177

87178
#[cfg(test)]
88179
impl FutureSpawner for FutureQueue {
89-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T) {
180+
type E = Infallible;
181+
type SpawnedFutureResult<O> = FutureQueueCompletion<O>;
182+
fn spawn<O: MaybeSend + 'static, F: Future<Output = O> + MaybeSend + 'static>(
183+
&self, f: F,
184+
) -> FutureQueueCompletion<O> {
185+
let completion = FutureQueueCompletion::new();
186+
let compl_ref = completion.clone();
187+
let future = async move {
188+
compl_ref.complete(f.await);
189+
};
90190
#[cfg(feature = "std")]
91191
{
92192
self.0.lock().unwrap().push(Box::pin(future));
@@ -95,14 +195,24 @@ impl FutureSpawner for FutureQueue {
95195
{
96196
self.0.borrow_mut().push(Box::pin(future));
97197
}
198+
completion
98199
}
99200
}
100201

101202
#[cfg(test)]
102203
impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static> FutureSpawner
103204
for D
104205
{
105-
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T) {
206+
type E = Infallible;
207+
type SpawnedFutureResult<O> = FutureQueueCompletion<O>;
208+
fn spawn<O: MaybeSend + 'static, F: Future<Output = O> + MaybeSend + 'static>(
209+
&self, f: F,
210+
) -> FutureQueueCompletion<O> {
211+
let completion = FutureQueueCompletion::new();
212+
let compl_ref = completion.clone();
213+
let future = async move {
214+
compl_ref.complete(f.await);
215+
};
106216
#[cfg(feature = "std")]
107217
{
108218
self.0.lock().unwrap().push(Box::pin(future));
@@ -111,5 +221,6 @@ impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static
111221
{
112222
self.0.borrow_mut().push(Box::pin(future));
113223
}
224+
completion
114225
}
115226
}

0 commit comments

Comments
 (0)