Skip to content

Commit cd71ff1

Browse files
committed
Add an option to deserialize monitors in parallel in async load
`MonitorUpdatingPersister::read_all_channel_monitors_with_updates` was made to do the IO operations in parallel in a previous commit, however in practice this doesn't provide material parallelism for large routing nodes. Because deserializing `ChannelMonitor`s is the bulk of the work (when IO operations are sufficiently fast), we end up blocked in single-threaded work nearly the entire time. Here, we add an alternative option - a new `read_all_channel_monitors_with_updates_parallel` method which uses the `FutureSpawner` to cause the deserialization operations to proceed in parallel.
1 parent bd466df commit cd71ff1

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

lightning/src/util/persist.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,14 @@ where
870870

871871
/// Reads all stored channel monitors, along with any stored updates for them.
872872
///
873+
/// While the reads themselves are performend in parallel, deserializing the
874+
/// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
875+
/// this may substantially limit the parallelism of this method.
876+
///
877+
/// If you can move this object into an `Arc`, consider using
878+
/// [`Self::read_all_channel_monitors_with_updates_parallel`] to parallelize the CPU-bound
879+
/// deserialization as well.
880+
///
873881
/// It is extremely important that your [`KVStore::read`] implementation uses the
874882
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
875883
/// documentation for [`MonitorUpdatingPersister`].
@@ -898,6 +906,57 @@ where
898906
Ok(res)
899907
}
900908

909+
/// Reads all stored channel monitors, along with any stored updates for them, in parallel.
910+
///
911+
/// Because deserializing large [`ChannelMonitor`]s from forwarding nodes is often CPU-bound,
912+
/// this version of [`Self::read_all_channel_monitors_with_updates`] uses the [`FutureSpawner`]
913+
/// to parallelize deserialization as well as the IO operations.
914+
///
915+
/// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
916+
/// and other multi-threaded runtime requirements), this method requires that `self` be an
917+
/// `Arc` that can live for `'static` and be sent and accessed across threads.
918+
///
919+
/// It is extremely important that your [`KVStore::read`] implementation uses the
920+
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
921+
/// documentation for [`MonitorUpdatingPersister`].
922+
pub async fn read_all_channel_monitors_with_updates_parallel(
923+
self: &Arc<Self>,
924+
) -> Result<
925+
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
926+
io::Error,
927+
>
928+
where
929+
K: MaybeSend + MaybeSync + 'static,
930+
L: MaybeSend + MaybeSync + 'static,
931+
ES: MaybeSend + MaybeSync + 'static,
932+
SP: MaybeSend + MaybeSync + 'static,
933+
BI: MaybeSend + MaybeSync + 'static,
934+
FE: MaybeSend + MaybeSync + 'static,
935+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend,
936+
{
937+
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
938+
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
939+
let monitor_list = self.0.kv_store.list(primary, secondary).await?;
940+
let mut futures = Vec::with_capacity(monitor_list.len());
941+
for monitor_key in monitor_list {
942+
let us = Arc::clone(&self);
943+
futures.push(ResultFuture::Pending(self.0.future_spawner.spawn(async move {
944+
us.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await
945+
})));
946+
}
947+
let future_results = MultiResultFuturePoller::new(futures).await;
948+
let mut res = Vec::with_capacity(future_results.len());
949+
for result in future_results {
950+
match result {
951+
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "Future was cancelled")),
952+
Ok(Err(e)) => return Err(e),
953+
Ok(Ok(Some(read_res))) => res.push(read_res),
954+
Ok(Ok(None)) => {},
955+
}
956+
}
957+
Ok(res)
958+
}
959+
901960
/// Read a single channel monitor, along with any stored updates for it.
902961
///
903962
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the

0 commit comments

Comments
 (0)