@@ -46,12 +46,14 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
4646use crate :: ln:: types:: ChannelId ;
4747use crate :: prelude:: * ;
4848use crate :: sign:: ecdsa:: EcdsaChannelSigner ;
49- use crate :: sign:: { EntropySource , PeerStorageKey } ;
49+ use crate :: sign:: { EntropySource , PeerStorageKey , SignerProvider } ;
5050use crate :: sync:: { Mutex , MutexGuard , RwLock , RwLockReadGuard } ;
5151use crate :: types:: features:: { InitFeatures , NodeFeatures } ;
52+ use crate :: util:: async_poll:: { MaybeSend , MaybeSync } ;
5253use crate :: util:: errors:: APIError ;
5354use crate :: util:: logger:: { Logger , WithContext } ;
54- use crate :: util:: persist:: MonitorName ;
55+ use crate :: util:: native_async:: FutureSpawner ;
56+ use crate :: util:: persist:: { KVStore , MonitorName , MonitorUpdatingPersisterAsync } ;
5557#[ cfg( peer_storage) ]
5658use crate :: util:: ser:: { VecWriter , Writeable } ;
5759use crate :: util:: wakers:: { Future , Notifier } ;
@@ -192,6 +194,17 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
192194 /// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
193195 /// the monitor already exists in the archive.
194196 fn archive_persisted_channel ( & self , monitor_name : MonitorName ) ;
197+
198+ /// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
199+ /// [`Self::update_persisted_channel`], which have completed.
200+ ///
201+ /// Returning an update here is equivalent to calling
202+ /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
203+ /// hidden in the docs.
204+ #[ doc( hidden) ]
205+ fn get_and_clear_completed_updates ( & self ) -> Vec < ( ChannelId , u64 ) > {
206+ Vec :: new ( )
207+ }
195208}
196209
197210struct MonitorHolder < ChannelSigner : EcdsaChannelSigner > {
@@ -235,6 +248,93 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
235248 }
236249}
237250
251+ /// An unconstructable [`Persist`]er which is used under the hood when you call
252+ /// [`ChainMonitor::new_async_beta`].
253+ pub struct AsyncPersister <
254+ K : Deref + MaybeSend + MaybeSync + ' static ,
255+ S : FutureSpawner ,
256+ L : Deref + MaybeSend + MaybeSync + ' static ,
257+ ES : Deref + MaybeSend + MaybeSync + ' static ,
258+ SP : Deref + MaybeSend + MaybeSync + ' static ,
259+ BI : Deref + MaybeSend + MaybeSync + ' static ,
260+ FE : Deref + MaybeSend + MaybeSync + ' static ,
261+ > where
262+ K :: Target : KVStore + MaybeSync ,
263+ L :: Target : Logger ,
264+ ES :: Target : EntropySource + Sized ,
265+ SP :: Target : SignerProvider + Sized ,
266+ BI :: Target : BroadcasterInterface ,
267+ FE :: Target : FeeEstimator ,
268+ {
269+ persister : MonitorUpdatingPersisterAsync < K , S , L , ES , SP , BI , FE > ,
270+ }
271+
272+ impl <
273+ K : Deref + MaybeSend + MaybeSync + ' static ,
274+ S : FutureSpawner ,
275+ L : Deref + MaybeSend + MaybeSync + ' static ,
276+ ES : Deref + MaybeSend + MaybeSync + ' static ,
277+ SP : Deref + MaybeSend + MaybeSync + ' static ,
278+ BI : Deref + MaybeSend + MaybeSync + ' static ,
279+ FE : Deref + MaybeSend + MaybeSync + ' static ,
280+ > Deref for AsyncPersister < K , S , L , ES , SP , BI , FE >
281+ where
282+ K :: Target : KVStore + MaybeSync ,
283+ L :: Target : Logger ,
284+ ES :: Target : EntropySource + Sized ,
285+ SP :: Target : SignerProvider + Sized ,
286+ BI :: Target : BroadcasterInterface ,
287+ FE :: Target : FeeEstimator ,
288+ {
289+ type Target = Self ;
290+ fn deref ( & self ) -> & Self {
291+ self
292+ }
293+ }
294+
295+ impl <
296+ K : Deref + MaybeSend + MaybeSync + ' static ,
297+ S : FutureSpawner ,
298+ L : Deref + MaybeSend + MaybeSync + ' static ,
299+ ES : Deref + MaybeSend + MaybeSync + ' static ,
300+ SP : Deref + MaybeSend + MaybeSync + ' static ,
301+ BI : Deref + MaybeSend + MaybeSync + ' static ,
302+ FE : Deref + MaybeSend + MaybeSync + ' static ,
303+ > Persist < <SP :: Target as SignerProvider >:: EcdsaSigner > for AsyncPersister < K , S , L , ES , SP , BI , FE >
304+ where
305+ K :: Target : KVStore + MaybeSync ,
306+ L :: Target : Logger ,
307+ ES :: Target : EntropySource + Sized ,
308+ SP :: Target : SignerProvider + Sized ,
309+ BI :: Target : BroadcasterInterface ,
310+ FE :: Target : FeeEstimator ,
311+ <SP :: Target as SignerProvider >:: EcdsaSigner : MaybeSend + ' static ,
312+ {
313+ fn persist_new_channel (
314+ & self , monitor_name : MonitorName ,
315+ monitor : & ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ,
316+ ) -> ChannelMonitorUpdateStatus {
317+ self . persister . spawn_async_persist_new_channel ( monitor_name, monitor) ;
318+ ChannelMonitorUpdateStatus :: InProgress
319+ }
320+
321+ fn update_persisted_channel (
322+ & self , monitor_name : MonitorName , monitor_update : Option < & ChannelMonitorUpdate > ,
323+ monitor : & ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ,
324+ ) -> ChannelMonitorUpdateStatus {
325+ self . persister . spawn_async_update_persisted_channel ( monitor_name, monitor_update, monitor) ;
326+ ChannelMonitorUpdateStatus :: InProgress
327+ }
328+
329+ fn archive_persisted_channel ( & self , monitor_name : MonitorName ) {
330+ self . persister . spawn_async_archive_persisted_channel ( monitor_name) ;
331+ }
332+
333+ fn get_and_clear_completed_updates ( & self ) -> Vec < ( ChannelId , u64 ) > {
334+ self . persister . get_and_clear_completed_updates ( )
335+ }
336+ }
337+
238338/// An implementation of [`chain::Watch`] for monitoring channels.
239339///
240340/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -291,6 +391,63 @@ pub struct ChainMonitor<
291391 our_peerstorage_encryption_key : PeerStorageKey ,
292392}
293393
394+ impl <
395+ K : Deref + MaybeSend + MaybeSync + ' static ,
396+ S : FutureSpawner ,
397+ SP : Deref + MaybeSend + MaybeSync + ' static ,
398+ C : Deref ,
399+ T : Deref + MaybeSend + MaybeSync + ' static ,
400+ F : Deref + MaybeSend + MaybeSync + ' static ,
401+ L : Deref + MaybeSend + MaybeSync + ' static ,
402+ ES : Deref + MaybeSend + MaybeSync + ' static ,
403+ >
404+ ChainMonitor <
405+ <SP :: Target as SignerProvider >:: EcdsaSigner ,
406+ C ,
407+ T ,
408+ F ,
409+ L ,
410+ AsyncPersister < K , S , L , ES , SP , T , F > ,
411+ ES ,
412+ > where
413+ K :: Target : KVStore + MaybeSync ,
414+ SP :: Target : SignerProvider + Sized ,
415+ C :: Target : chain:: Filter ,
416+ T :: Target : BroadcasterInterface ,
417+ F :: Target : FeeEstimator ,
418+ L :: Target : Logger ,
419+ ES :: Target : EntropySource + Sized ,
420+ <SP :: Target as SignerProvider >:: EcdsaSigner : MaybeSend + ' static ,
421+ {
422+ /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
423+ ///
424+ /// This behaves the same as [`ChainMonitor::new`] except that it relies on
425+ /// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
426+ ///
427+ /// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
428+ pub fn new_async_beta (
429+ chain_source : Option < C > , broadcaster : T , logger : L , feeest : F ,
430+ persister : MonitorUpdatingPersisterAsync < K , S , L , ES , SP , T , F > , _entropy_source : ES ,
431+ _our_peerstorage_encryption_key : PeerStorageKey ,
432+ ) -> Self {
433+ Self {
434+ monitors : RwLock :: new ( new_hash_map ( ) ) ,
435+ chain_source,
436+ broadcaster,
437+ logger,
438+ fee_estimator : feeest,
439+ persister : AsyncPersister { persister } ,
440+ _entropy_source,
441+ pending_monitor_events : Mutex :: new ( Vec :: new ( ) ) ,
442+ highest_chain_height : AtomicUsize :: new ( 0 ) ,
443+ event_notifier : Notifier :: new ( ) ,
444+ pending_send_only_events : Mutex :: new ( Vec :: new ( ) ) ,
445+ #[ cfg( peer_storage) ]
446+ our_peerstorage_encryption_key : _our_peerstorage_encryption_key,
447+ }
448+ }
449+ }
450+
294451impl <
295452 ChannelSigner : EcdsaChannelSigner ,
296453 C : Deref ,
@@ -1357,6 +1514,9 @@ where
13571514 fn release_pending_monitor_events (
13581515 & self ,
13591516 ) -> Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , PublicKey ) > {
1517+ for ( channel_id, update_id) in self . persister . get_and_clear_completed_updates ( ) {
1518+ let _ = self . channel_monitor_updated ( channel_id, update_id) ;
1519+ }
13601520 let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
13611521 for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
13621522 let monitor_events = monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ;
0 commit comments