@@ -78,26 +78,48 @@ impl MonitorUpdateId {
7878/// `Persist` defines behavior for persisting channel monitors: this could mean
7979/// writing once to disk, and/or uploading to one or more backup services.
8080///
81- /// Each method can return two possible values:
82- /// * If persistence (including any relevant `fsync()` calls) happens immediately, the
83- /// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal
84- /// channel operation should continue.
81+ /// Persistence can happen in one of two ways - synchronously completing before the trait method
82+ /// calls return or asynchronously in the background.
8583///
86- /// * If persistence happens asynchronously, implementations can return
87- /// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background.
88- /// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with
89- /// the corresponding [`MonitorUpdateId`].
84+ /// # For those implementing synchronous persistence
9085///
91- /// Note that unlike the direct [`chain::Watch`] interface,
92- /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
86+ /// * If persistence completes fully (including any relevant `fsync()` calls), the implementation
87+ /// should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation
88+ /// should continue.
9389///
94- /// If persistence fails for some reason, implementations should still return
95- /// [`ChannelMonitorUpdateStatus::InProgress`] and attempt to shut down or otherwise resolve the
96- /// situation ASAP.
90+ /// * If persistence fails for some reason, implementations should consider returning
91+ /// [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in
92+ /// the background with [`ChainMonitor::list_pending_monitor_updates`] and
93+ /// [`ChainMonitor::get_monitor`].
9794///
98- /// Third-party watchtowers may be built as a part of an implementation of this trait, with the
99- /// advantage that you can control whether to resume channel operation depending on if an update
100- /// has been persisted to a watchtower. For this, you may find the following methods useful:
95+ /// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can
96+ /// be marked as complete via [`ChainMonitor::channel_monitor_updated`].
97+ ///
98+ /// If at some point no further progress can be made towards persisting the pending updates, the
99+ /// node should simply shut down.
100+ ///
101+ /// * If the persistence has failed and cannot be retried,
102+ /// [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in
103+ /// an immediate panic and future operations in LDK generally failing.
104+ ///
105+ /// # For those implementing asynchronous persistence
106+ ///
107+ /// All calls should generally spawn a background task and immediately return
108+ /// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes,
109+ /// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding
110+ /// [`MonitorUpdateId`].
111+ ///
112+ /// Note that unlike the direct [`chain::Watch`] interface,
113+ /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
114+ ///
115+ /// If at some point no further progress can be made towards persisting a pending update, the node
116+ /// should simply shut down.
117+ ///
118+ /// # Using remote watchtowers
119+ ///
120+ /// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async
121+ /// update process described above while the watchtower is being updated. The following methods are
122+ /// provided for bulding transactions for a watchtower:
101123/// [`ChannelMonitor::initial_counterparty_commitment_tx`],
102124/// [`ChannelMonitor::counterparty_commitment_txs_from_update`],
103125/// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`],
@@ -279,19 +301,31 @@ where C::Target: chain::Filter,
279301 where
280302 FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
281303 {
304+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
282305 let funding_outpoints: HashSet < OutPoint > = HashSet :: from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ;
283306 for funding_outpoint in funding_outpoints. iter ( ) {
284307 let monitor_lock = self . monitors . read ( ) . unwrap ( ) ;
285308 if let Some ( monitor_state) = monitor_lock. get ( funding_outpoint) {
286- self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
309+ if self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
310+ // Take the monitors lock for writing so that we poison it and any future
311+ // operations going forward fail immediately.
312+ core:: mem:: drop ( monitor_state) ;
313+ core:: mem:: drop ( monitor_lock) ;
314+ let _poison = self . monitors . write ( ) . unwrap ( ) ;
315+ log_error ! ( self . logger, "{}" , err_str) ;
316+ panic ! ( "{}" , err_str) ;
317+ }
287318 }
288319 }
289320
290321 // do some followup cleanup if any funding outpoints were added in between iterations
291322 let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
292323 for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
293324 if !funding_outpoints. contains ( funding_outpoint) {
294- self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
325+ if self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
326+ log_error ! ( self . logger, "{}" , err_str) ;
327+ panic ! ( "{}" , err_str) ;
328+ }
295329 }
296330 }
297331
@@ -306,7 +340,10 @@ where C::Target: chain::Filter,
306340 }
307341 }
308342
309- fn update_monitor_with_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint , monitor_state : & MonitorHolder < ChannelSigner > ) where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
343+ fn update_monitor_with_chain_data < FN > (
344+ & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData ,
345+ process : FN , funding_outpoint : & OutPoint , monitor_state : & MonitorHolder < ChannelSigner >
346+ ) -> Result < ( ) , ( ) > where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
310347 let monitor = & monitor_state. monitor ;
311348 let mut txn_outputs;
312349 {
@@ -331,7 +368,10 @@ where C::Target: chain::Filter,
331368 ChannelMonitorUpdateStatus :: InProgress => {
332369 log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
333370 pending_monitor_updates. push ( update_id) ;
334- }
371+ } ,
372+ ChannelMonitorUpdateStatus :: UnrecoverableError => {
373+ return Err ( ( ) ) ;
374+ } ,
335375 }
336376 }
337377
@@ -351,6 +391,7 @@ where C::Target: chain::Filter,
351391 }
352392 }
353393 }
394+ Ok ( ( ) )
354395 }
355396
356397 /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -674,7 +715,12 @@ where C::Target: chain::Filter,
674715 } ,
675716 ChannelMonitorUpdateStatus :: Completed => {
676717 log_info ! ( self . logger, "Persistence of new ChannelMonitor for channel {} completed" , log_funding_info!( monitor) ) ;
677- }
718+ } ,
719+ ChannelMonitorUpdateStatus :: UnrecoverableError => {
720+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
721+ log_error ! ( self . logger, "{}" , err_str) ;
722+ panic ! ( "{}" , err_str) ;
723+ } ,
678724 }
679725 if let Some ( ref chain_source) = self . chain_source {
680726 monitor. load_outputs_to_watch ( chain_source) ;
@@ -690,7 +736,7 @@ where C::Target: chain::Filter,
690736 fn update_channel ( & self , funding_txo : OutPoint , update : & ChannelMonitorUpdate ) -> ChannelMonitorUpdateStatus {
691737 // Update the monitor that watches the channel referred to by the given outpoint.
692738 let monitors = self . monitors . read ( ) . unwrap ( ) ;
693- match monitors. get ( & funding_txo) {
739+ let ret = match monitors. get ( & funding_txo) {
694740 None => {
695741 log_error ! ( self . logger, "Failed to update channel monitor: no such monitor registered" ) ;
696742
@@ -722,14 +768,25 @@ where C::Target: chain::Filter,
722768 ChannelMonitorUpdateStatus :: Completed => {
723769 log_debug ! ( self . logger, "Persistence of ChannelMonitorUpdate for channel {} completed" , log_funding_info!( monitor) ) ;
724770 } ,
771+ ChannelMonitorUpdateStatus :: UnrecoverableError => { /* we'll panic in a moment */ } ,
725772 }
726773 if update_res. is_err ( ) {
727774 ChannelMonitorUpdateStatus :: InProgress
728775 } else {
729776 persist_res
730777 }
731778 }
779+ } ;
780+ if let ChannelMonitorUpdateStatus :: UnrecoverableError = ret {
781+ // Take the monitors lock for writing so that we poison it and any future
782+ // operations going forward fail immediately.
783+ core:: mem:: drop ( monitors) ;
784+ let _poison = self . monitors . write ( ) . unwrap ( ) ;
785+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
786+ log_error ! ( self . logger, "{}" , err_str) ;
787+ panic ! ( "{}" , err_str) ;
732788 }
789+ ret
733790 }
734791
735792 fn release_pending_monitor_events ( & self ) -> Vec < ( OutPoint , Vec < MonitorEvent > , Option < PublicKey > ) > {
@@ -973,4 +1030,25 @@ mod tests {
9731030 do_chainsync_pauses_events ( false ) ;
9741031 do_chainsync_pauses_events ( true ) ;
9751032 }
1033+
1034+ #[ test]
1035+ fn update_during_chainsync_poisons_channel ( ) {
1036+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
1037+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
1038+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
1039+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
1040+ create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
1041+
1042+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1043+ chanmon_cfgs[ 0 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: UnrecoverableError ) ;
1044+
1045+ assert ! ( std:: panic:: catch_unwind( || {
1046+ // Returning an UnrecoverableError should always panic immediately
1047+ connect_blocks( & nodes[ 0 ] , 1 ) ;
1048+ } ) . is_err( ) ) ;
1049+ assert ! ( std:: panic:: catch_unwind( || {
1050+ // ...and also poison our locks causing later use to panic as well
1051+ core:: mem:: drop( nodes) ;
1052+ } ) . is_err( ) ) ;
1053+ }
9761054}
0 commit comments