Skip to content

Commit 2f8ecee

Browse files
committed
Add support for native async KVStore persist to ChainMonitor
This finally adds support for full native Rust `async` persistence to `ChainMonitor`. Way back when, before we had any other persistence, we added the `Persist` trait to persist `ChannelMonitor`s. It eventualy grew homegrown async persistence support via a simple immediate return and callback upon completion. We later added a persistence trait in `lightning-background-processor` to persist the few fields that it needed to drive writes for. Over time, we found more places where persistence was useful, and we eventually added a generic `KVStore` trait. In dc75436 we removed the `lightning-background-processor` `Persister` in favor of simply using the native `KVStore` directly. Here we continue that trend, building native `async` `ChannelMonitor` persistence on top of our native `KVStore` rather than hacking support for it into the `chain::Persist` trait. Because `MonitorUpdatingPersister` already exists as a common way to wrap a `KVStore` into a `ChannelMonitor` persister, we build exclusively on that (though note that the "monitor updating" part is now optional), utilizing its new async option as our native async driver. Thus, we end up with a `ChainMonitor::new_async_beta` which takes a `MonitorUpdatingPersisterAsync` rather than a classic `chain::Persist` and then operates the same as a normal `ChainMonitor`. While the requirement that users now use a `MonitorUpdatingPersister` to wrap their `KVStore` before providing it to `ChainMonitor` is somewhat awkward, as we move towards a `KVStore`-only world it seems like `MonitorUpdatingPersister` should eventually merge into `ChainMonitor`.
1 parent d835f61 commit 2f8ecee

File tree

2 files changed

+187
-5
lines changed

2 files changed

+187
-5
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
4646
use crate::ln::types::ChannelId;
4747
use crate::prelude::*;
4848
use crate::sign::ecdsa::EcdsaChannelSigner;
49-
use crate::sign::{EntropySource, PeerStorageKey};
49+
use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
5050
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
5151
use crate::types::features::{InitFeatures, NodeFeatures};
52+
use crate::util::async_poll::{MaybeSend, MaybeSync};
5253
use crate::util::errors::APIError;
5354
use crate::util::logger::{Logger, WithContext};
54-
use crate::util::persist::MonitorName;
55+
use crate::util::native_async::FutureSpawner;
56+
use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
5557
#[cfg(peer_storage)]
5658
use crate::util::ser::{VecWriter, Writeable};
5759
use crate::util::wakers::{Future, Notifier};
@@ -192,6 +194,17 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
192194
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
193195
/// the monitor already exists in the archive.
194196
fn archive_persisted_channel(&self, monitor_name: MonitorName);
197+
198+
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
199+
/// [`Self::update_persisted_channel`], which have completed.
200+
///
201+
/// Returning an update here is equivalent to calling
202+
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
203+
/// hidden in the docs.
204+
#[doc(hidden)]
205+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
206+
Vec::new()
207+
}
195208
}
196209

197210
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -235,6 +248,93 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
235248
}
236249
}
237250

251+
/// An unconstructable [`Persist`]er which is used under the hood when you call
252+
/// [`ChainMonitor::new_async_beta`].
253+
pub struct AsyncPersister<
254+
K: Deref + MaybeSend + MaybeSync + 'static,
255+
S: FutureSpawner,
256+
L: Deref + MaybeSend + MaybeSync + 'static,
257+
ES: Deref + MaybeSend + MaybeSync + 'static,
258+
SP: Deref + MaybeSend + MaybeSync + 'static,
259+
BI: Deref + MaybeSend + MaybeSync + 'static,
260+
FE: Deref + MaybeSend + MaybeSync + 'static,
261+
> where
262+
K::Target: KVStore + MaybeSync,
263+
L::Target: Logger,
264+
ES::Target: EntropySource + Sized,
265+
SP::Target: SignerProvider + Sized,
266+
BI::Target: BroadcasterInterface,
267+
FE::Target: FeeEstimator,
268+
{
269+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
270+
}
271+
272+
impl<
273+
K: Deref + MaybeSend + MaybeSync + 'static,
274+
S: FutureSpawner,
275+
L: Deref + MaybeSend + MaybeSync + 'static,
276+
ES: Deref + MaybeSend + MaybeSync + 'static,
277+
SP: Deref + MaybeSend + MaybeSync + 'static,
278+
BI: Deref + MaybeSend + MaybeSync + 'static,
279+
FE: Deref + MaybeSend + MaybeSync + 'static,
280+
> Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
281+
where
282+
K::Target: KVStore + MaybeSync,
283+
L::Target: Logger,
284+
ES::Target: EntropySource + Sized,
285+
SP::Target: SignerProvider + Sized,
286+
BI::Target: BroadcasterInterface,
287+
FE::Target: FeeEstimator,
288+
{
289+
type Target = Self;
290+
fn deref(&self) -> &Self {
291+
self
292+
}
293+
}
294+
295+
impl<
296+
K: Deref + MaybeSend + MaybeSync + 'static,
297+
S: FutureSpawner,
298+
L: Deref + MaybeSend + MaybeSync + 'static,
299+
ES: Deref + MaybeSend + MaybeSync + 'static,
300+
SP: Deref + MaybeSend + MaybeSync + 'static,
301+
BI: Deref + MaybeSend + MaybeSync + 'static,
302+
FE: Deref + MaybeSend + MaybeSync + 'static,
303+
> Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
304+
where
305+
K::Target: KVStore + MaybeSync,
306+
L::Target: Logger,
307+
ES::Target: EntropySource + Sized,
308+
SP::Target: SignerProvider + Sized,
309+
BI::Target: BroadcasterInterface,
310+
FE::Target: FeeEstimator,
311+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
312+
{
313+
fn persist_new_channel(
314+
&self, monitor_name: MonitorName,
315+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
316+
) -> ChannelMonitorUpdateStatus {
317+
self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
318+
ChannelMonitorUpdateStatus::InProgress
319+
}
320+
321+
fn update_persisted_channel(
322+
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
323+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
324+
) -> ChannelMonitorUpdateStatus {
325+
self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
326+
ChannelMonitorUpdateStatus::InProgress
327+
}
328+
329+
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
330+
self.persister.spawn_async_archive_persisted_channel(monitor_name);
331+
}
332+
333+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
334+
self.persister.get_and_clear_completed_updates()
335+
}
336+
}
337+
238338
/// An implementation of [`chain::Watch`] for monitoring channels.
239339
///
240340
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -291,6 +391,63 @@ pub struct ChainMonitor<
291391
our_peerstorage_encryption_key: PeerStorageKey,
292392
}
293393

394+
impl<
395+
K: Deref + MaybeSend + MaybeSync + 'static,
396+
S: FutureSpawner,
397+
SP: Deref + MaybeSend + MaybeSync + 'static,
398+
C: Deref,
399+
T: Deref + MaybeSend + MaybeSync + 'static,
400+
F: Deref + MaybeSend + MaybeSync + 'static,
401+
L: Deref + MaybeSend + MaybeSync + 'static,
402+
ES: Deref + MaybeSend + MaybeSync + 'static,
403+
>
404+
ChainMonitor<
405+
<SP::Target as SignerProvider>::EcdsaSigner,
406+
C,
407+
T,
408+
F,
409+
L,
410+
AsyncPersister<K, S, L, ES, SP, T, F>,
411+
ES,
412+
> where
413+
K::Target: KVStore + MaybeSync,
414+
SP::Target: SignerProvider + Sized,
415+
C::Target: chain::Filter,
416+
T::Target: BroadcasterInterface,
417+
F::Target: FeeEstimator,
418+
L::Target: Logger,
419+
ES::Target: EntropySource + Sized,
420+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
421+
{
422+
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
423+
///
424+
/// This behaves the same as [`ChainMonitor::new`] except that it relies on
425+
/// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
426+
///
427+
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
428+
pub fn new_async_beta(
429+
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
430+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
431+
_our_peerstorage_encryption_key: PeerStorageKey,
432+
) -> Self {
433+
Self {
434+
monitors: RwLock::new(new_hash_map()),
435+
chain_source,
436+
broadcaster,
437+
logger,
438+
fee_estimator: feeest,
439+
persister: AsyncPersister { persister },
440+
_entropy_source,
441+
pending_monitor_events: Mutex::new(Vec::new()),
442+
highest_chain_height: AtomicUsize::new(0),
443+
event_notifier: Notifier::new(),
444+
pending_send_only_events: Mutex::new(Vec::new()),
445+
#[cfg(peer_storage)]
446+
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
447+
}
448+
}
449+
}
450+
294451
impl<
295452
ChannelSigner: EcdsaChannelSigner,
296453
C: Deref,
@@ -1357,6 +1514,9 @@ where
13571514
fn release_pending_monitor_events(
13581515
&self,
13591516
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1517+
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1518+
let _ = self.channel_monitor_updated(channel_id, update_id);
1519+
}
13601520
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
13611521
for monitor_state in self.monitors.read().unwrap().values() {
13621522
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

lightning/src/util/persist.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,9 @@ where
561561
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
562562
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
563563
) -> Self {
564+
// Note that calling the spawner only happens in the `pub(crate)` `spawn_*` methods defined
565+
// with additional bounds on `MonitorUpdatingPersisterAsync`. Thus its safe to provide a
566+
// dummy always-panic implementation here.
564567
MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
565568
KVStoreSyncWrapper(kv_store),
566569
PanicingSpawner,
@@ -704,9 +707,10 @@ where
704707
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
705708
///
706709
/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
707-
/// directly by the [`ChainMonitor`].
710+
/// directly by the [`ChainMonitor`] via [`ChainMonitor::new_async_beta`].
708711
///
709712
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
713+
/// [`ChainMonitor::new_async_beta`]: crate::chain::chainmonitor::ChainMonitor::new_async_beta
710714
pub struct MonitorUpdatingPersisterAsync<
711715
K: Deref,
712716
S: FutureSpawner,
@@ -741,6 +745,7 @@ struct MonitorUpdatingPersisterAsyncInner<
741745
FE::Target: FeeEstimator,
742746
{
743747
kv_store: K,
748+
async_completed_updates: Mutex<Vec<(ChannelId, u64)>>,
744749
future_spawner: S,
745750
logger: L,
746751
maximum_pending_updates: u64,
@@ -769,6 +774,7 @@ where
769774
) -> Self {
770775
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
771776
kv_store,
777+
async_completed_updates: Mutex::new(Vec::new()),
772778
future_spawner,
773779
logger,
774780
maximum_pending_updates,
@@ -861,11 +867,14 @@ where
861867
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
862868
) {
863869
let inner = Arc::clone(&self.0);
870+
// Note that `persist_new_channel` is a sync method which calls all the way through to the
871+
// sync KVStore::write method (which returns a future) to ensure writes are well-ordered.
864872
let future = inner.persist_new_channel(monitor_name, monitor);
865873
let channel_id = monitor.channel_id();
874+
let completion = (monitor.channel_id(), monitor.get_latest_update_id());
866875
self.0.future_spawner.spawn(async move {
867876
match future.await {
868-
Ok(()) => {}, // TODO: expose completions
877+
Ok(()) => inner.async_completed_updates.lock().unwrap().push(completion),
869878
Err(e) => {
870879
log_error!(
871880
inner.logger,
@@ -881,12 +890,21 @@ where
881890
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
882891
) {
883892
let inner = Arc::clone(&self.0);
893+
// Note that `update_persisted_channel` is a sync method which calls all the way through to
894+
// the sync KVStore::write method (which returns a future) to ensure writes are well-ordered
884895
let future = inner.update_persisted_channel(monitor_name, update, monitor);
885896
let channel_id = monitor.channel_id();
897+
let completion = if let Some(update) = update {
898+
Some((monitor.channel_id(), update.update_id))
899+
} else {
900+
None
901+
};
886902
let inner = Arc::clone(&self.0);
887903
self.0.future_spawner.spawn(async move {
888904
match future.await {
889-
Ok(()) => {}, // TODO: expose completions
905+
Ok(()) => if let Some(completion) = completion {
906+
inner.async_completed_updates.lock().unwrap().push(completion);
907+
},
890908
Err(e) => {
891909
log_error!(
892910
inner.logger,
@@ -903,6 +921,10 @@ where
903921
inner.archive_persisted_channel(monitor_name).await;
904922
});
905923
}
924+
925+
pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
926+
mem::take(&mut *self.0.async_completed_updates.lock().unwrap())
927+
}
906928
}
907929

908930
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>

0 commit comments

Comments
 (0)