Skip to content

Commit 31fbd16

Browse files
committed
Parallelize ChannelMonitor loading from async KVStores
Reading `ChannelMonitor`s on startup is one of the slowest parts of LDK initialization. Now that we have an async `KVStore`, there's no need for that, we can simply paralellize their loading, which we do here. Sadly, because Rust futures are pretty unergonomic, we have to add some `unsafe {}` here, but arguing its fine is relatively straightforward.
1 parent 2b29c85 commit 31fbd16

File tree

3 files changed

+29
-17
lines changed

3 files changed

+29
-17
lines changed

lightning/src/chain/channelmonitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6684,8 +6684,8 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
66846684
// return Ok(None) to allow it to be skipped and not loaded.
66856685
return Ok(None);
66866686
} else {
6687-
panic!("Found monitor for channel {channel_id} with no updates since v0.0.118.\
6688-
These monitors are no longer supported.\
6687+
panic!("Found monitor for channel {channel_id} with no updates since v0.0.118. \
6688+
These monitors are no longer supported. \
66896689
To continue, run a v0.1 release, send/route a payment over the channel or close it.");
66906690
}
66916691
}

lightning/src/util/async_poll.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,31 @@ use core::marker::Unpin;
1616
use core::pin::Pin;
1717
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1818

19-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
19+
pub(crate) enum ResultFuture<F: Future<Output = Result<O, E>> + Unpin, O, E> {
2020
Pending(F),
21-
Ready(Result<(), E>),
21+
Ready(Result<O, E>),
2222
}
2323

24-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
25-
futures_state: Vec<ResultFuture<F, E>>,
24+
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<O, E>> + Unpin, O, E> {
25+
futures_state: Vec<ResultFuture<F, O, E>>,
2626
}
2727

28-
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
29-
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
28+
impl<F: Future<Output = Result<O, E>> + Unpin, O, E> MultiResultFuturePoller<F, O, E> {
29+
pub fn new(futures_state: Vec<ResultFuture<F, O, E>>) -> Self {
3030
Self { futures_state }
3131
}
3232
}
3333

34-
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> Future for MultiResultFuturePoller<F, E> {
35-
type Output = Vec<Result<(), E>>;
36-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
34+
impl<F: Future<Output = Result<O, E>> + Unpin, O, E> Future for MultiResultFuturePoller<F, O, E> {
35+
type Output = Vec<Result<O, E>>;
36+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<O, E>>> {
3737
let mut have_pending_futures = false;
38-
let futures_state = &mut self.get_mut().futures_state;
38+
// SAFETY: While we are pinned, we can't get direct access to `futures_state` because we
39+
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
40+
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
41+
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
42+
// reference directly.
43+
let futures_state = unsafe { &mut self.get_unchecked_mut().futures_state };
3944
for state in futures_state.iter_mut() {
4045
match state {
4146
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {

lightning/src/util/persist.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use crate::chain::transaction::OutPoint;
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
3636
use crate::sync::Mutex;
37-
use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync};
37+
use crate::util::async_poll::{
38+
dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture,
39+
};
3840
use crate::util::logger::Logger;
3941
use crate::util::native_async::FutureSpawner;
4042
use crate::util::ser::{Readable, ReadableArgs, Writeable};
@@ -783,11 +785,16 @@ where
783785
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
784786
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
785787
let monitor_list = self.0.kv_store.list(primary, secondary).await?;
786-
let mut res = Vec::with_capacity(monitor_list.len());
788+
let mut futures = Vec::with_capacity(monitor_list.len());
787789
for monitor_key in monitor_list {
788-
let result =
789-
self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await?;
790-
if let Some(read_res) = result {
790+
futures.push(ResultFuture::Pending(Box::pin(async move {
791+
self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await
792+
})));
793+
}
794+
let future_results = MultiResultFuturePoller::new(futures).await;
795+
let mut res = Vec::with_capacity(future_results.len());
796+
for result in future_results {
797+
if let Some(read_res) = result? {
791798
res.push(read_res);
792799
}
793800
}

0 commit comments

Comments
 (0)