@@ -17,6 +17,7 @@ use bitcoin::hashes::hex::FromHex;
1717use bitcoin:: { BlockHash , Txid } ;
1818
1919use core:: future:: Future ;
20+ use core:: mem;
2021use core:: ops:: Deref ;
2122use core:: pin:: Pin ;
2223use core:: str:: FromStr ;
@@ -32,8 +33,10 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
3233use crate :: chain:: transaction:: OutPoint ;
3334use crate :: ln:: types:: ChannelId ;
3435use crate :: sign:: { ecdsa:: EcdsaChannelSigner , EntropySource , SignerProvider } ;
35- use crate :: util:: async_poll:: dummy_waker;
36+ use crate :: sync:: Mutex ;
37+ use crate :: util:: async_poll:: { dummy_waker, MaybeSend , MaybeSync } ;
3638use crate :: util:: logger:: Logger ;
39+ use crate :: util:: native_async:: FutureSpawner ;
3740use crate :: util:: ser:: { Readable , ReadableArgs , Writeable } ;
3841
3942/// The alphabet of characters allowed for namespaces and keys.
@@ -409,6 +412,13 @@ where
409412 Ok ( res)
410413}
411414
415+ struct PanicingSpawner ;
416+ impl FutureSpawner for PanicingSpawner {
417+ fn spawn < T : Future < Output = ( ) > + MaybeSend + ' static > ( & self , _: T ) {
418+ unreachable ! ( ) ;
419+ }
420+ }
421+
412422fn poll_sync_future < F : Future > ( future : F ) -> F :: Output {
413423 let mut waker = dummy_waker ( ) ;
414424 let mut ctx = task:: Context :: from_waker ( & mut waker) ;
@@ -507,7 +517,7 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
507517/// would like to get rid of them, consider using the
508518/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
509519pub struct MonitorUpdatingPersister < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref > (
510- MonitorUpdatingPersisterAsync < KVStoreSyncWrapper < K > , L , ES , SP , BI , FE > ,
520+ MonitorUpdatingPersisterAsync < KVStoreSyncWrapper < K > , PanicingSpawner , L , ES , SP , BI , FE > ,
511521)
512522where
513523 K :: Target : KVStoreSync ,
@@ -553,6 +563,7 @@ where
553563 ) -> Self {
554564 MonitorUpdatingPersister ( MonitorUpdatingPersisterAsync :: new (
555565 KVStoreSyncWrapper ( kv_store) ,
566+ PanicingSpawner ,
556567 logger,
557568 maximum_pending_updates,
558569 entropy_source,
@@ -665,8 +676,8 @@ where
665676 & self , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
666677 monitor : & ChannelMonitor < ChannelSigner > ,
667678 ) -> chain:: ChannelMonitorUpdateStatus {
668- let res =
669- poll_sync_future ( self . 0 . 0 . update_persisted_channel ( monitor_name, update, monitor) ) ;
679+ let inner = Arc :: clone ( & self . 0 . 0 ) ;
680+ let res = poll_sync_future ( inner . update_persisted_channel ( monitor_name, update, monitor) ) ;
670681 match res {
671682 Ok ( ( ) ) => chain:: ChannelMonitorUpdateStatus :: Completed ,
672683 Err ( e) => {
@@ -691,14 +702,20 @@ where
691702/// async versions of the public accessors.
692703///
693704/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
705+ ///
706+ /// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
707+ /// directly by the [`ChainMonitor`].
708+ ///
709+ /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
694710pub struct MonitorUpdatingPersisterAsync <
695711 K : Deref ,
712+ S : FutureSpawner ,
696713 L : Deref ,
697714 ES : Deref ,
698715 SP : Deref ,
699716 BI : Deref ,
700717 FE : Deref ,
701- > ( Arc < MonitorUpdatingPersisterAsyncInner < K , L , ES , SP , BI , FE > > )
718+ > ( Arc < MonitorUpdatingPersisterAsyncInner < K , S , L , ES , SP , BI , FE > > )
702719where
703720 K :: Target : KVStore ,
704721 L :: Target : Logger ,
@@ -709,6 +726,7 @@ where
709726
710727struct MonitorUpdatingPersisterAsyncInner <
711728 K : Deref ,
729+ S : FutureSpawner ,
712730 L : Deref ,
713731 ES : Deref ,
714732 SP : Deref ,
@@ -723,6 +741,7 @@ struct MonitorUpdatingPersisterAsyncInner<
723741 FE :: Target : FeeEstimator ,
724742{
725743 kv_store : K ,
744+ future_spawner : S ,
726745 logger : L ,
727746 maximum_pending_updates : u64 ,
728747 entropy_source : ES ,
@@ -731,8 +750,8 @@ struct MonitorUpdatingPersisterAsyncInner<
731750 fee_estimator : FE ,
732751}
733752
734- impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
735- MonitorUpdatingPersisterAsync < K , L , ES , SP , BI , FE >
753+ impl < K : Deref , S : FutureSpawner , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
754+ MonitorUpdatingPersisterAsync < K , S , L , ES , SP , BI , FE >
736755where
737756 K :: Target : KVStore ,
738757 L :: Target : Logger ,
@@ -745,11 +764,12 @@ where
745764 ///
746765 /// See [`MonitorUpdatingPersister::new`] for more info.
747766 pub fn new (
748- kv_store : K , logger : L , maximum_pending_updates : u64 , entropy_source : ES ,
749- signer_provider : SP , broadcaster : BI , fee_estimator : FE ,
767+ kv_store : K , future_spawner : S , logger : L , maximum_pending_updates : u64 ,
768+ entropy_source : ES , signer_provider : SP , broadcaster : BI , fee_estimator : FE ,
750769 ) -> Self {
751770 MonitorUpdatingPersisterAsync ( Arc :: new ( MonitorUpdatingPersisterAsyncInner {
752771 kv_store,
772+ future_spawner,
753773 logger,
754774 maximum_pending_updates,
755775 entropy_source,
@@ -818,8 +838,75 @@ where
818838 }
819839}
820840
821- impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
822- MonitorUpdatingPersisterAsyncInner < K , L , ES , SP , BI , FE >
841+ impl <
842+ K : Deref + MaybeSend + MaybeSync + ' static ,
843+ S : FutureSpawner ,
844+ L : Deref + MaybeSend + MaybeSync + ' static ,
845+ ES : Deref + MaybeSend + MaybeSync + ' static ,
846+ SP : Deref + MaybeSend + MaybeSync + ' static ,
847+ BI : Deref + MaybeSend + MaybeSync + ' static ,
848+ FE : Deref + MaybeSend + MaybeSync + ' static ,
849+ > MonitorUpdatingPersisterAsync < K , S , L , ES , SP , BI , FE >
850+ where
851+ K :: Target : KVStore + MaybeSync ,
852+ L :: Target : Logger ,
853+ ES :: Target : EntropySource + Sized ,
854+ SP :: Target : SignerProvider + Sized ,
855+ BI :: Target : BroadcasterInterface ,
856+ FE :: Target : FeeEstimator ,
857+ <SP :: Target as SignerProvider >:: EcdsaSigner : MaybeSend + ' static ,
858+ {
859+ pub ( crate ) fn spawn_async_persist_new_channel (
860+ & self , monitor_name : MonitorName ,
861+ monitor : & ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ,
862+ ) {
863+ let inner = Arc :: clone ( & self . 0 ) ;
864+ let future = inner. persist_new_channel ( monitor_name, monitor) ;
865+ let channel_id = monitor. channel_id ( ) ;
866+ self . 0 . future_spawner . spawn ( async move {
867+ match future. await {
868+ Ok ( ( ) ) => { } , // TODO: expose completions
869+ Err ( e) => {
870+ log_error ! (
871+ inner. logger,
872+ "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible." ,
873+ ) ;
874+ } ,
875+ }
876+ } ) ;
877+ }
878+
879+ pub ( crate ) fn spawn_async_update_persisted_channel (
880+ & self , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
881+ monitor : & ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ,
882+ ) {
883+ let inner = Arc :: clone ( & self . 0 ) ;
884+ let future = inner. update_persisted_channel ( monitor_name, update, monitor) ;
885+ let channel_id = monitor. channel_id ( ) ;
886+ let inner = Arc :: clone ( & self . 0 ) ;
887+ self . 0 . future_spawner . spawn ( async move {
888+ match future. await {
889+ Ok ( ( ) ) => { } , // TODO: expose completions
890+ Err ( e) => {
891+ log_error ! (
892+ inner. logger,
893+ "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible." ,
894+ ) ;
895+ } ,
896+ }
897+ } ) ;
898+ }
899+
900+ pub ( crate ) fn spawn_async_archive_persisted_channel ( & self , monitor_name : MonitorName ) {
901+ let inner = Arc :: clone ( & self . 0 ) ;
902+ self . 0 . future_spawner . spawn ( async move {
903+ inner. archive_persisted_channel ( monitor_name) . await ;
904+ } ) ;
905+ }
906+ }
907+
908+ impl < K : Deref , S : FutureSpawner , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
909+ MonitorUpdatingPersisterAsyncInner < K , S , L , ES , SP , BI , FE >
823910where
824911 K :: Target : KVStore ,
825912 L :: Target : Logger ,
@@ -938,7 +1025,7 @@ where
9381025 let monitor_name = MonitorName :: from_str ( & monitor_key) ?;
9391026 let ( _, current_monitor) = self . read_monitor ( & monitor_name, & monitor_key) . await ?;
9401027 let latest_update_id = current_monitor. get_latest_update_id ( ) ;
941- self . cleanup_stale_updates_for_monitor_to ( & monitor_key, latest_update_id, lazy) . await ;
1028+ self . cleanup_stale_updates_for_monitor_to ( & monitor_key, latest_update_id, lazy) . await ? ;
9421029 }
9431030 Ok ( ( ) )
9441031 }
@@ -958,9 +1045,9 @@ where
9581045 Ok ( ( ) )
9591046 }
9601047
961- async fn persist_new_channel < ChannelSigner : EcdsaChannelSigner > (
1048+ fn persist_new_channel < ChannelSigner : EcdsaChannelSigner > (
9621049 & self , monitor_name : MonitorName , monitor : & ChannelMonitor < ChannelSigner > ,
963- ) -> Result < ( ) , io:: Error > {
1050+ ) -> impl Future < Output = Result < ( ) , io:: Error > > {
9641051 // Determine the proper key for this monitor
9651052 let monitor_key = monitor_name. to_string ( ) ;
9661053 // Serialize and write the new monitor
@@ -974,16 +1061,25 @@ where
9741061 monitor_bytes. extend_from_slice ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) ;
9751062 }
9761063 monitor. write ( & mut monitor_bytes) . unwrap ( ) ;
1064+ // Note that this is NOT an async function, but rather calls the *sync* KVStore write
1065+ // method, allowing it to do its queueing immediately, and then return a future for the
1066+ // completion of the write. This ensures monitor persistence ordering is preserved.
9771067 let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
9781068 let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
979- self . kv_store . write ( primary, secondary, monitor_key. as_str ( ) , monitor_bytes) . await
1069+ self . kv_store . write ( primary, secondary, monitor_key. as_str ( ) , monitor_bytes)
9801070 }
9811071
982- async fn update_persisted_channel < ChannelSigner : EcdsaChannelSigner > (
983- & self , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
1072+ fn update_persisted_channel < ' a , ChannelSigner : EcdsaChannelSigner + ' a > (
1073+ self : Arc < Self > , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
9841074 monitor : & ChannelMonitor < ChannelSigner > ,
985- ) -> Result < ( ) , io:: Error > {
1075+ ) -> impl Future < Output = Result < ( ) , io:: Error > > + ' a
1076+ where
1077+ Self : ' a ,
1078+ {
9861079 const LEGACY_CLOSED_CHANNEL_UPDATE_ID : u64 = u64:: MAX ;
1080+ let mut res_a = None ;
1081+ let mut res_b = None ;
1082+ let mut res_c = None ;
9871083 if let Some ( update) = update {
9881084 let persist_update = update. update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
9891085 && self . maximum_pending_updates != 0
@@ -992,37 +1088,67 @@ where
9921088 let monitor_key = monitor_name. to_string ( ) ;
9931089 let update_name = UpdateName :: from ( update. update_id ) ;
9941090 let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
995- self . kv_store
996- . write ( primary, & monitor_key, update_name. as_str ( ) , update. encode ( ) )
997- . await
1091+ // Note that this is NOT an async function, but rather calls the *sync* KVStore
1092+ // write method, allowing it to do its queueing immediately, and then return a
1093+ // future for the completion of the write. This ensures monitor persistence
1094+ // ordering is preserved.
1095+ res_a = Some ( self . kv_store . write (
1096+ primary,
1097+ & monitor_key,
1098+ update_name. as_str ( ) ,
1099+ update. encode ( ) ,
1100+ ) ) ;
9981101 } else {
9991102 // We could write this update, but it meets criteria of our design that calls for a full monitor write.
1000- let write_status = self . persist_new_channel ( monitor_name, monitor) . await ;
1001-
1002- if let Ok ( ( ) ) = write_status {
1003- let channel_closed_legacy =
1004- monitor. get_latest_update_id ( ) == LEGACY_CLOSED_CHANNEL_UPDATE_ID ;
1005- let latest_update_id = monitor. get_latest_update_id ( ) ;
1006- if channel_closed_legacy {
1007- let monitor_key = monitor_name. to_string ( ) ;
1008- self . cleanup_stale_updates_for_monitor_to (
1009- & monitor_key,
1010- latest_update_id,
1011- true ,
1012- )
1013- . await ;
1014- } else {
1015- let end = latest_update_id;
1016- let start = end. saturating_sub ( self . maximum_pending_updates ) ;
1017- self . cleanup_in_range ( monitor_name, start, end) . await ;
1103+ // Note that this is NOT an async function, but rather calls the *sync* KVStore
1104+ // write method, allowing it to do its queueing immediately, and then return a
1105+ // future for the completion of the write. This ensures monitor persistence
1106+ // ordering is preserved. This, thus, must happen before any await we do below.
1107+ let write_fut = self . persist_new_channel ( monitor_name, monitor) ;
1108+ let latest_update_id = monitor. get_latest_update_id ( ) ;
1109+
1110+ res_b = Some ( async move {
1111+ let write_status = write_fut. await ;
1112+ if let Ok ( ( ) ) = write_status {
1113+ if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1114+ let monitor_key = monitor_name. to_string ( ) ;
1115+ self . cleanup_stale_updates_for_monitor_to (
1116+ & monitor_key,
1117+ latest_update_id,
1118+ true ,
1119+ )
1120+ . await ?;
1121+ } else {
1122+ let end = latest_update_id;
1123+ let start = end. saturating_sub ( self . maximum_pending_updates ) ;
1124+ self . cleanup_in_range ( monitor_name, start, end) . await ;
1125+ }
10181126 }
1019- }
10201127
1021- write_status
1128+ write_status
1129+ } ) ;
10221130 }
10231131 } else {
10241132 // There is no update given, so we must persist a new monitor.
1025- self . persist_new_channel ( monitor_name, monitor) . await
1133+ // Note that this is NOT an async function, but rather calls the *sync* KVStore write
1134+ // method, allowing it to do its queueing immediately, and then return a future for the
1135+ // completion of the write. This ensures monitor persistence ordering is preserved.
1136+ res_c = Some ( self . persist_new_channel ( monitor_name, monitor) ) ;
1137+ }
1138+ async move {
1139+ // Complete any pending future(s). Note that to keep one return type we have to end
1140+ // with a single async move block that we return, rather than trying to return the
1141+ // individual futures themselves.
1142+ if let Some ( a) = res_a {
1143+ a. await ?;
1144+ }
1145+ if let Some ( b) = res_b {
1146+ b. await ?;
1147+ }
1148+ if let Some ( c) = res_c {
1149+ c. await ?;
1150+ }
1151+ Ok ( ( ) )
10261152 }
10271153 }
10281154
0 commit comments