Skip to content

Commit d835f61

Browse files
committed
Add async persistence logic in MonitorUpdatingPersister
In the next commit we'll add the ability to use an async `KVStore` as the backing for a `ChainMonitor`. Here we tee this up by adding an async API to `MonitorUpdatingPersisterAsync`. Its not intended for public use and is thus only `pub(crate)` but allows spawning all operations via a generic `FutureSpawner` trait, initiating the write via the `KVStore` before any `await`s (or async functions). Because we aren't going to make the `ChannelManager` (or `ChainMonitor`) fully async, we need a way to alert the `ChainMonitor` when a persistence completes, but we leave that for the next commit.
1 parent 6e3ba86 commit d835f61

File tree

1 file changed

+168
-42
lines changed

1 file changed

+168
-42
lines changed

lightning/src/util/persist.rs

Lines changed: 168 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use bitcoin::hashes::hex::FromHex;
1717
use bitcoin::{BlockHash, Txid};
1818

1919
use core::future::Future;
20+
use core::mem;
2021
use core::ops::Deref;
2122
use core::pin::Pin;
2223
use core::str::FromStr;
@@ -32,8 +33,10 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
3233
use crate::chain::transaction::OutPoint;
3334
use crate::ln::types::ChannelId;
3435
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
35-
use crate::util::async_poll::dummy_waker;
36+
use crate::sync::Mutex;
37+
use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync};
3638
use crate::util::logger::Logger;
39+
use crate::util::native_async::FutureSpawner;
3740
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3841

3942
/// The alphabet of characters allowed for namespaces and keys.
@@ -409,6 +412,13 @@ where
409412
Ok(res)
410413
}
411414

415+
struct PanicingSpawner;
416+
impl FutureSpawner for PanicingSpawner {
417+
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
418+
unreachable!();
419+
}
420+
}
421+
412422
fn poll_sync_future<F: Future>(future: F) -> F::Output {
413423
let mut waker = dummy_waker();
414424
let mut ctx = task::Context::from_waker(&mut waker);
@@ -507,7 +517,7 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
507517
/// would like to get rid of them, consider using the
508518
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
509519
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>(
510-
MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, L, ES, SP, BI, FE>,
520+
MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, PanicingSpawner, L, ES, SP, BI, FE>,
511521
)
512522
where
513523
K::Target: KVStoreSync,
@@ -553,6 +563,7 @@ where
553563
) -> Self {
554564
MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
555565
KVStoreSyncWrapper(kv_store),
566+
PanicingSpawner,
556567
logger,
557568
maximum_pending_updates,
558569
entropy_source,
@@ -665,8 +676,8 @@ where
665676
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
666677
monitor: &ChannelMonitor<ChannelSigner>,
667678
) -> chain::ChannelMonitorUpdateStatus {
668-
let res =
669-
poll_sync_future(self.0 .0.update_persisted_channel(monitor_name, update, monitor));
679+
let inner = Arc::clone(&self.0 .0);
680+
let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor));
670681
match res {
671682
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
672683
Err(e) => {
@@ -691,14 +702,20 @@ where
691702
/// async versions of the public accessors.
692703
///
693704
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
705+
///
706+
/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
707+
/// directly by the [`ChainMonitor`].
708+
///
709+
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
694710
pub struct MonitorUpdatingPersisterAsync<
695711
K: Deref,
712+
S: FutureSpawner,
696713
L: Deref,
697714
ES: Deref,
698715
SP: Deref,
699716
BI: Deref,
700717
FE: Deref,
701-
>(Arc<MonitorUpdatingPersisterAsyncInner<K, L, ES, SP, BI, FE>>)
718+
>(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
702719
where
703720
K::Target: KVStore,
704721
L::Target: Logger,
@@ -709,6 +726,7 @@ where
709726

710727
struct MonitorUpdatingPersisterAsyncInner<
711728
K: Deref,
729+
S: FutureSpawner,
712730
L: Deref,
713731
ES: Deref,
714732
SP: Deref,
@@ -723,6 +741,7 @@ struct MonitorUpdatingPersisterAsyncInner<
723741
FE::Target: FeeEstimator,
724742
{
725743
kv_store: K,
744+
future_spawner: S,
726745
logger: L,
727746
maximum_pending_updates: u64,
728747
entropy_source: ES,
@@ -731,8 +750,8 @@ struct MonitorUpdatingPersisterAsyncInner<
731750
fee_estimator: FE,
732751
}
733752

734-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
735-
MonitorUpdatingPersisterAsync<K, L, ES, SP, BI, FE>
753+
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
754+
MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
736755
where
737756
K::Target: KVStore,
738757
L::Target: Logger,
@@ -745,11 +764,12 @@ where
745764
///
746765
/// See [`MonitorUpdatingPersister::new`] for more info.
747766
pub fn new(
748-
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
749-
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
767+
kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64,
768+
entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE,
750769
) -> Self {
751770
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
752771
kv_store,
772+
future_spawner,
753773
logger,
754774
maximum_pending_updates,
755775
entropy_source,
@@ -818,8 +838,75 @@ where
818838
}
819839
}
820840

821-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
822-
MonitorUpdatingPersisterAsyncInner<K, L, ES, SP, BI, FE>
841+
impl<
842+
K: Deref + MaybeSend + MaybeSync + 'static,
843+
S: FutureSpawner,
844+
L: Deref + MaybeSend + MaybeSync + 'static,
845+
ES: Deref + MaybeSend + MaybeSync + 'static,
846+
SP: Deref + MaybeSend + MaybeSync + 'static,
847+
BI: Deref + MaybeSend + MaybeSync + 'static,
848+
FE: Deref + MaybeSend + MaybeSync + 'static,
849+
> MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
850+
where
851+
K::Target: KVStore + MaybeSync,
852+
L::Target: Logger,
853+
ES::Target: EntropySource + Sized,
854+
SP::Target: SignerProvider + Sized,
855+
BI::Target: BroadcasterInterface,
856+
FE::Target: FeeEstimator,
857+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
858+
{
859+
pub(crate) fn spawn_async_persist_new_channel(
860+
&self, monitor_name: MonitorName,
861+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
862+
) {
863+
let inner = Arc::clone(&self.0);
864+
let future = inner.persist_new_channel(monitor_name, monitor);
865+
let channel_id = monitor.channel_id();
866+
self.0.future_spawner.spawn(async move {
867+
match future.await {
868+
Ok(()) => {}, // TODO: expose completions
869+
Err(e) => {
870+
log_error!(
871+
inner.logger,
872+
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
873+
);
874+
},
875+
}
876+
});
877+
}
878+
879+
pub(crate) fn spawn_async_update_persisted_channel(
880+
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
881+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
882+
) {
883+
let inner = Arc::clone(&self.0);
884+
let future = inner.update_persisted_channel(monitor_name, update, monitor);
885+
let channel_id = monitor.channel_id();
886+
let inner = Arc::clone(&self.0);
887+
self.0.future_spawner.spawn(async move {
888+
match future.await {
889+
Ok(()) => {}, // TODO: expose completions
890+
Err(e) => {
891+
log_error!(
892+
inner.logger,
893+
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
894+
);
895+
},
896+
}
897+
});
898+
}
899+
900+
pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) {
901+
let inner = Arc::clone(&self.0);
902+
self.0.future_spawner.spawn(async move {
903+
inner.archive_persisted_channel(monitor_name).await;
904+
});
905+
}
906+
}
907+
908+
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
909+
MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>
823910
where
824911
K::Target: KVStore,
825912
L::Target: Logger,
@@ -938,7 +1025,7 @@ where
9381025
let monitor_name = MonitorName::from_str(&monitor_key)?;
9391026
let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?;
9401027
let latest_update_id = current_monitor.get_latest_update_id();
941-
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await;
1028+
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await?;
9421029
}
9431030
Ok(())
9441031
}
@@ -958,9 +1045,9 @@ where
9581045
Ok(())
9591046
}
9601047

961-
async fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
1048+
fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
9621049
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
963-
) -> Result<(), io::Error> {
1050+
) -> impl Future<Output = Result<(), io::Error>> {
9641051
// Determine the proper key for this monitor
9651052
let monitor_key = monitor_name.to_string();
9661053
// Serialize and write the new monitor
@@ -974,16 +1061,25 @@ where
9741061
monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
9751062
}
9761063
monitor.write(&mut monitor_bytes).unwrap();
1064+
// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1065+
// method, allowing it to do its queueing immediately, and then return a future for the
1066+
// completion of the write. This ensures monitor persistence ordering is preserved.
9771067
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
9781068
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
979-
self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes).await
1069+
self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)
9801070
}
9811071

982-
async fn update_persisted_channel<ChannelSigner: EcdsaChannelSigner>(
983-
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
1072+
fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>(
1073+
self: Arc<Self>, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
9841074
monitor: &ChannelMonitor<ChannelSigner>,
985-
) -> Result<(), io::Error> {
1075+
) -> impl Future<Output = Result<(), io::Error>> + 'a
1076+
where
1077+
Self: 'a,
1078+
{
9861079
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
1080+
let mut res_a = None;
1081+
let mut res_b = None;
1082+
let mut res_c = None;
9871083
if let Some(update) = update {
9881084
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
9891085
&& self.maximum_pending_updates != 0
@@ -992,37 +1088,67 @@ where
9921088
let monitor_key = monitor_name.to_string();
9931089
let update_name = UpdateName::from(update.update_id);
9941090
let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
995-
self.kv_store
996-
.write(primary, &monitor_key, update_name.as_str(), update.encode())
997-
.await
1091+
// Note that this is NOT an async function, but rather calls the *sync* KVStore
1092+
// write method, allowing it to do its queueing immediately, and then return a
1093+
// future for the completion of the write. This ensures monitor persistence
1094+
// ordering is preserved.
1095+
res_a = Some(self.kv_store.write(
1096+
primary,
1097+
&monitor_key,
1098+
update_name.as_str(),
1099+
update.encode(),
1100+
));
9981101
} else {
9991102
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
1000-
let write_status = self.persist_new_channel(monitor_name, monitor).await;
1001-
1002-
if let Ok(()) = write_status {
1003-
let channel_closed_legacy =
1004-
monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID;
1005-
let latest_update_id = monitor.get_latest_update_id();
1006-
if channel_closed_legacy {
1007-
let monitor_key = monitor_name.to_string();
1008-
self.cleanup_stale_updates_for_monitor_to(
1009-
&monitor_key,
1010-
latest_update_id,
1011-
true,
1012-
)
1013-
.await;
1014-
} else {
1015-
let end = latest_update_id;
1016-
let start = end.saturating_sub(self.maximum_pending_updates);
1017-
self.cleanup_in_range(monitor_name, start, end).await;
1103+
// Note that this is NOT an async function, but rather calls the *sync* KVStore
1104+
// write method, allowing it to do its queueing immediately, and then return a
1105+
// future for the completion of the write. This ensures monitor persistence
1106+
// ordering is preserved. This, thus, must happen before any await we do below.
1107+
let write_fut = self.persist_new_channel(monitor_name, monitor);
1108+
let latest_update_id = monitor.get_latest_update_id();
1109+
1110+
res_b = Some(async move {
1111+
let write_status = write_fut.await;
1112+
if let Ok(()) = write_status {
1113+
if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1114+
let monitor_key = monitor_name.to_string();
1115+
self.cleanup_stale_updates_for_monitor_to(
1116+
&monitor_key,
1117+
latest_update_id,
1118+
true,
1119+
)
1120+
.await?;
1121+
} else {
1122+
let end = latest_update_id;
1123+
let start = end.saturating_sub(self.maximum_pending_updates);
1124+
self.cleanup_in_range(monitor_name, start, end).await;
1125+
}
10181126
}
1019-
}
10201127

1021-
write_status
1128+
write_status
1129+
});
10221130
}
10231131
} else {
10241132
// There is no update given, so we must persist a new monitor.
1025-
self.persist_new_channel(monitor_name, monitor).await
1133+
// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1134+
// method, allowing it to do its queueing immediately, and then return a future for the
1135+
// completion of the write. This ensures monitor persistence ordering is preserved.
1136+
res_c = Some(self.persist_new_channel(monitor_name, monitor));
1137+
}
1138+
async move {
1139+
// Complete any pending future(s). Note that to keep one return type we have to end
1140+
// with a single async move block that we return, rather than trying to return the
1141+
// individual futures themselves.
1142+
if let Some(a) = res_a {
1143+
a.await?;
1144+
}
1145+
if let Some(b) = res_b {
1146+
b.await?;
1147+
}
1148+
if let Some(c) = res_c {
1149+
c.await?;
1150+
}
1151+
Ok(())
10261152
}
10271153
}
10281154

0 commit comments

Comments
 (0)