Skip to content

Commit 41584c8

Browse files
committed
Add an option to deserialize monitors in parallel in async load
`MonitorUpdatingPersister::read_all_channel_monitors_with_updates` was made to do the IO operations in parallel in a previous commit, however in practice this doesn't provide material parallelism for large routing nodes. Because deserializing `ChannelMonitor`s is the bulk of the work (when IO operations are sufficiently fast), we end up blocked in single-threaded work nearly the entire time. Here, we add an alternative option - a new `read_all_channel_monitors_with_updates_parallel` method which uses the `FutureSpawner` to cause the deserialization operations to proceed in parallel.
1 parent c5d82e9 commit 41584c8

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

lightning/src/util/native_async.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ use core::task::{Context, Poll};
3030
/// [`Self::SpawnedFutureResult`] is dropped without being polled. This matches the semantics of
3131
/// `tokio::spawn`.
3232
pub trait FutureSpawner: MaybeSend + MaybeSync + 'static {
33-
/// The error type of [`Self::SpawnedFutureResult`]. This can be used to indicate that the
34-
/// spawned future was cancelled or panicked.
33+
/// The error type of [`Self::SpawnedFutureResult`].
3534
type E;
3635
/// The result of [`Self::spawn`], a future which completes when the spawned future completes.
3736
type SpawnedFutureResult<O>: Future<Output = Result<O, Self::E>> + Unpin;

lightning/src/util/persist.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,10 @@ where
778778

779779
/// Reads all stored channel monitors, along with any stored updates for them.
780780
///
781+
/// While the reads themselves are performend in parallel, deserializing the
782+
/// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
783+
/// this may substantially limit the parallelism of this method.
784+
///
781785
/// It is extremely important that your [`KVStore::read`] implementation uses the
782786
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
783787
/// documentation for [`MonitorUpdatingPersister`].
@@ -806,6 +810,56 @@ where
806810
Ok(res)
807811
}
808812

813+
/// Reads all stored channel monitors, along with any stored updates for them, in parallel.
814+
///
815+
/// Because deserializing large [`ChannelMonitor`]s from forwarding nodes is often CPU-bound,
816+
/// this version of [`Self::read_all_channel_monitors_with_updates`] uses the [`FutureSpawner`]
817+
/// to parallelize deserialization as well as the IO operations.
818+
///
819+
/// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
820+
/// and other multi-threaded runtime requirements), this method requires that `self` be an
821+
/// `Arc` that can live for `'static` and be sent and accessed across threads.
822+
///
823+
/// It is extremely important that your [`KVStore::read`] implementation uses the
824+
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
825+
/// documentation for [`MonitorUpdatingPersister`].
826+
pub async fn read_all_channel_monitors_with_updates_parallel(
827+
self: &Arc<Self>,
828+
) -> Result<
829+
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
830+
io::Error,
831+
> where
832+
K: MaybeSend + MaybeSync + 'static,
833+
L: MaybeSend + MaybeSync + 'static,
834+
ES: MaybeSend + MaybeSync + 'static,
835+
SP: MaybeSend + MaybeSync + 'static,
836+
BI: MaybeSend + MaybeSync + 'static,
837+
FE: MaybeSend + MaybeSync + 'static,
838+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend,
839+
{
840+
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
841+
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
842+
let monitor_list = self.0.kv_store.list(primary, secondary).await?;
843+
let mut futures = Vec::with_capacity(monitor_list.len());
844+
for monitor_key in monitor_list {
845+
let us = Arc::clone(&self);
846+
futures.push(ResultFuture::Pending(self.0.future_spawner.spawn(async move {
847+
us.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await
848+
})));
849+
}
850+
let future_results = MultiResultFuturePoller::new(futures).await;
851+
let mut res = Vec::with_capacity(future_results.len());
852+
for result in future_results {
853+
match result {
854+
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "Future was cancelled")),
855+
Ok(Err(e)) => return Err(e),
856+
Ok(Ok(Some(read_res))) => res.push(read_res),
857+
Ok(Ok(None)) => {},
858+
}
859+
}
860+
Ok(res)
861+
}
862+
809863
/// Read a single channel monitor, along with any stored updates for it.
810864
///
811865
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the

0 commit comments

Comments
 (0)