Skip to content

Commit 493692a

Browse files
committed
Avoid a storage RTT when loading ChannelMonitors without updates
When reading `ChannelMonitor`s from a `MonitorUpdatingPersister` on startup, we have to make sure to load any `ChannelMonitorUpdate`s and re-apply them as well. For users of async persistence who don't have any `ChannelMonitorUpdate`s (e.g. because they set `maximum_pending_updates` to 0 or, in the future, we avoid persisting updates for small `ChannelMonitor`s), this means two round-trips to the storage backend, one to load the `ChannelMonitor` and one to try to read the next `ChannelMonitorUpdate` only to have it fail. Instead, here, we use `KVStore::list` to fetch the list of stored `ChannelMonitorUpdate`s, which for async `KVStore` users allows us to parallelize the list of update fetching and the `ChannelMonitor` loading itself. Then we know exactly when to stop reading `ChannelMonitorUpdate`s, including reading none if there are none to read. This also avoids relying on `KVStore::read` correctly returning `NotFound` in order to correctly discover when to stop reading `ChannelMonitorUpdate`s.
1 parent e673574 commit 493692a

File tree

2 files changed

+92
-61
lines changed

2 files changed

+92
-61
lines changed

lightning/src/util/async_poll.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,69 @@ pub(crate) enum ResultFuture<F: Future<Output = O> + Unpin, O> {
2121
Ready(O),
2222
}
2323

24+
pub(crate) struct TwoFutureJoiner<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> {
25+
a: Option<ResultFuture<AF, AO>>,
26+
b: Option<ResultFuture<BF, BO>>,
27+
}
28+
29+
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> TwoFutureJoiner<AO, BO, AF, BF> {
30+
pub fn new(future_a: AF, future_b: BF) -> Self {
31+
Self {
32+
a: Some(ResultFuture::Pending(future_a)),
33+
b: Some(ResultFuture::Pending(future_b)),
34+
}
35+
}
36+
}
37+
38+
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> Future for TwoFutureJoiner<AO, BO, AF, BF> {
39+
type Output = (AO, BO);
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(AO, BO)> {
41+
let mut have_pending_futures = false;
42+
// SAFETY: While we are pinned, we can't get direct access to our internal state because we
43+
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
44+
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
45+
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
46+
// reference directly.
47+
let state = unsafe { &mut self.get_unchecked_mut() };
48+
macro_rules! poll_future {
49+
($future: ident) => {
50+
match state.$future {
51+
Some(ResultFuture::Pending(ref mut fut)) => match Pin::new(fut).poll(cx) {
52+
Poll::Ready(res) => {
53+
state.$future = Some(ResultFuture::Ready(res));
54+
},
55+
Poll::Pending => {
56+
have_pending_futures = true;
57+
},
58+
},
59+
Some(ResultFuture::Ready(_)) => {},
60+
None => {
61+
debug_assert!(false, "Future polled after Ready");
62+
return Poll::Pending;
63+
},
64+
}
65+
};
66+
}
67+
poll_future!(a);
68+
poll_future!(b);
69+
70+
if have_pending_futures {
71+
Poll::Pending
72+
} else {
73+
Poll::Ready((
74+
match state.a.take() {
75+
Some(ResultFuture::Ready(a)) => a,
76+
_ => unreachable!(),
77+
},
78+
match state.b.take() {
79+
Some(ResultFuture::Ready(b)) => b,
80+
_ => unreachable!(),
81+
}
82+
))
83+
}
84+
}
85+
}
86+
2487
pub(crate) struct MultiResultFuturePoller<F: Future<Output = O> + Unpin, O> {
2588
futures_state: Vec<ResultFuture<F, O>>,
2689
}

lightning/src/util/persist.rs

Lines changed: 29 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::ln::types::ChannelId;
3636
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
3737
use crate::sync::Mutex;
3838
use crate::util::async_poll::{
39-
dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture,
39+
dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner,
4040
};
4141
use crate::util::logger::Logger;
4242
use crate::util::native_async::FutureSpawner;
@@ -493,15 +493,6 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
493493
/// list channel monitors themselves and load channels individually using
494494
/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
495495
///
496-
/// ## EXTREMELY IMPORTANT
497-
///
498-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
499-
/// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
500-
/// that circumstance (not when there is really a permissions error, for example). This is because
501-
/// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
502-
/// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
503-
/// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
504-
///
505496
/// # Pruning stale channel updates
506497
///
507498
/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
@@ -569,10 +560,6 @@ where
569560
}
570561

571562
/// Reads all stored channel monitors, along with any stored updates for them.
572-
///
573-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
574-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
575-
/// documentation for [`MonitorUpdatingPersister`].
576563
pub fn read_all_channel_monitors_with_updates(
577564
&self,
578565
) -> Result<
@@ -584,10 +571,6 @@ where
584571

585572
/// Read a single channel monitor, along with any stored updates for it.
586573
///
587-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
588-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
589-
/// documentation for [`MonitorUpdatingPersister`].
590-
///
591574
/// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
592575
/// underscore `_` between txid and index for v1 channels. For example, given:
593576
///
@@ -781,10 +764,6 @@ where
781764
/// While the reads themselves are performend in parallel, deserializing the
782765
/// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
783766
/// this may substantially limit the parallelism of this method.
784-
///
785-
/// It is extremely important that your [`KVStore::read`] implementation uses the
786-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
787-
/// documentation for [`MonitorUpdatingPersister`].
788767
pub async fn read_all_channel_monitors_with_updates(
789768
&self,
790769
) -> Result<
@@ -819,10 +798,6 @@ where
819798
/// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
820799
/// and other multi-threaded runtime requirements), this method requires that `self` be an
821800
/// `Arc` that can live for `'static` and be sent and accessed across threads.
822-
///
823-
/// It is extremely important that your [`KVStore::read`] implementation uses the
824-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
825-
/// documentation for [`MonitorUpdatingPersister`].
826801
pub async fn read_all_channel_monitors_with_updates_parallel(
827802
self: &Arc<Self>,
828803
) -> Result<
@@ -862,10 +837,6 @@ where
862837

863838
/// Read a single channel monitor, along with any stored updates for it.
864839
///
865-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
866-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
867-
/// documentation for [`MonitorUpdatingPersister`].
868-
///
869840
/// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
870841
/// underscore `_` between txid and index for v1 channels. For example, given:
871842
///
@@ -1011,40 +982,37 @@ where
1011982
io::Error,
1012983
> {
1013984
let monitor_name = MonitorName::from_str(monitor_key)?;
1014-
let read_res = self.maybe_read_monitor(&monitor_name, monitor_key).await?;
1015-
let (block_hash, monitor) = match read_res {
985+
// TODO: After an MSRV bump we should be able to use the pin macro rather than Box::pin
986+
let read_future = Box::pin(self.maybe_read_monitor(&monitor_name, monitor_key));
987+
let list_future =
988+
Box::pin(self.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key));
989+
let (read_res, list_res) = TwoFutureJoiner::new(read_future, list_future).await;
990+
let (block_hash, monitor) = match read_res? {
1016991
Some(res) => res,
1017992
None => return Ok(None),
1018993
};
1019994
let mut current_update_id = monitor.get_latest_update_id();
1020-
// TODO: Parallelize this loop by speculatively reading a batch of updates
1021-
loop {
1022-
current_update_id = match current_update_id.checked_add(1) {
1023-
Some(next_update_id) => next_update_id,
1024-
None => break,
1025-
};
1026-
let update_name = UpdateName::from(current_update_id);
1027-
let update = match self.read_monitor_update(monitor_key, &update_name).await {
1028-
Ok(update) => update,
1029-
Err(err) if err.kind() == io::ErrorKind::NotFound => {
1030-
// We can't find any more updates, so we are done.
1031-
break;
1032-
},
1033-
Err(err) => return Err(err),
1034-
};
1035-
1036-
monitor
1037-
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1038-
.map_err(|e| {
1039-
log_error!(
1040-
self.logger,
1041-
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1042-
monitor_key,
1043-
update_name.as_str(),
1044-
e
1045-
);
1046-
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1047-
})?;
995+
let updates: Result<Vec<_>, _> =
996+
list_res?.into_iter().map(|name| UpdateName::new(name)).collect();
997+
let mut updates = updates?;
998+
updates.sort_unstable();
999+
// TODO: Parallelize this loop
1000+
for update_name in updates {
1001+
if update_name.0 > current_update_id {
1002+
let update = self.read_monitor_update(monitor_key, &update_name).await?;
1003+
monitor
1004+
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1005+
.map_err(|e| {
1006+
log_error!(
1007+
self.logger,
1008+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1009+
monitor_key,
1010+
update_name.as_str(),
1011+
e
1012+
);
1013+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1014+
})?;
1015+
}
10481016
}
10491017
Ok(Some((block_hash, monitor)))
10501018
}
@@ -1416,7 +1384,7 @@ impl core::fmt::Display for MonitorName {
14161384
/// let monitor_name = "some_monitor_name";
14171385
/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
14181386
/// ```
1419-
#[derive(Debug)]
1387+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
14201388
pub struct UpdateName(pub u64, String);
14211389

14221390
impl UpdateName {

0 commit comments

Comments
 (0)