@@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;
4242
4343use crate :: prelude:: * ;
4444use crate :: sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
45+ use core:: iter:: FromIterator ;
4546use core:: ops:: Deref ;
4647use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
4748use bitcoin:: secp256k1:: PublicKey ;
@@ -285,7 +286,22 @@ where C::Target: chain::Filter,
285286 where
286287 FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
287288 {
289+ let funding_outpoints: HashSet < OutPoint > = HashSet :: from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ;
290+ for funding_outpoint in funding_outpoints. iter ( ) {
291+ let monitor_lock = self . monitors . read ( ) . unwrap ( ) ;
292+ if let Some ( monitor_state) = monitor_lock. get ( funding_outpoint) {
293+ self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
294+ }
295+ }
296+
297+ // do some followup cleanup if any funding outpoints were added in between iterations
288298 let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
299+ for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
300+ if !funding_outpoints. contains ( funding_outpoint) {
301+ self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
302+ }
303+ }
304+
289305 if let Some ( height) = best_height {
290306 // If the best block height is being updated, update highest_chain_height under the
291307 // monitors write lock.
@@ -295,55 +311,55 @@ where C::Target: chain::Filter,
295311 self . highest_chain_height . store ( new_height, Ordering :: Release ) ;
296312 }
297313 }
314+ }
298315
299- for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
300- let monitor = & monitor_state. monitor ;
301- let mut txn_outputs;
302- {
303- txn_outputs = process ( monitor, txdata) ;
304- let update_id = MonitorUpdateId {
305- contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
306- } ;
307- let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
308- if let Some ( height) = best_height {
309- if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
310- // If there are not ChainSync persists awaiting completion, go ahead and
311- // set last_chain_persist_height here - we wouldn't want the first
312- // InProgress to always immediately be considered "overly delayed".
313- monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
314- }
316+ fn update_monitor_with_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint , monitor_state : & MonitorHolder < ChannelSigner > ) where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
317+ let monitor = & monitor_state. monitor ;
318+ let mut txn_outputs;
319+ {
320+ txn_outputs = process ( monitor, txdata) ;
321+ let update_id = MonitorUpdateId {
322+ contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
323+ } ;
324+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
325+ if let Some ( height) = best_height {
326+ if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
327+ // If there are not ChainSync persists awaiting completion, go ahead and
328+ // set last_chain_persist_height here - we wouldn't want the first
329+ // InProgress to always immediately be considered "overly delayed".
330+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
315331 }
332+ }
316333
317- log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
318- match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor, update_id) {
319- ChannelMonitorUpdateStatus :: Completed =>
320- log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
321- ChannelMonitorUpdateStatus :: PermanentFailure => {
322- monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
323- self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
324- self . event_notifier . notify ( ) ;
325- } ,
326- ChannelMonitorUpdateStatus :: InProgress => {
327- log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
328- pending_monitor_updates. push ( update_id) ;
329- } ,
334+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
335+ match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor, update_id) {
336+ ChannelMonitorUpdateStatus :: Completed =>
337+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
338+ ChannelMonitorUpdateStatus :: PermanentFailure => {
339+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
340+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
341+ self . event_notifier . notify ( ) ;
342+ }
343+ ChannelMonitorUpdateStatus :: InProgress => {
344+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
345+ pending_monitor_updates. push ( update_id) ;
330346 }
331347 }
348+ }
332349
333- // Register any new outputs with the chain source for filtering, storing any dependent
334- // transactions from within the block that previously had not been included in txdata.
335- if let Some ( ref chain_source) = self . chain_source {
336- let block_hash = header. block_hash ( ) ;
337- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
338- for ( idx, output) in outputs. drain ( ..) {
339- // Register any new outputs with the chain source for filtering
340- let output = WatchedOutput {
341- block_hash : Some ( block_hash) ,
342- outpoint : OutPoint { txid, index : idx as u16 } ,
343- script_pubkey : output. script_pubkey ,
344- } ;
345- chain_source. register_output ( output)
346- }
350+ // Register any new outputs with the chain source for filtering, storing any dependent
351+ // transactions from within the block that previously had not been included in txdata.
352+ if let Some ( ref chain_source) = self . chain_source {
353+ let block_hash = header. block_hash ( ) ;
354+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
355+ for ( idx, output) in outputs. drain ( ..) {
356+ // Register any new outputs with the chain source for filtering
357+ let output = WatchedOutput {
358+ block_hash : Some ( block_hash) ,
359+ outpoint : OutPoint { txid, index : idx as u16 } ,
360+ script_pubkey : output. script_pubkey ,
361+ } ;
362+ chain_source. register_output ( output)
347363 }
348364 }
349365 }
@@ -976,7 +992,7 @@ mod tests {
976992 assert!( err. contains( "ChannelMonitor storage failure" ) ) ) ;
977993 check_added_monitors ! ( nodes[ 0 ] , 2 ) ; // After the failure we generate a close-channel monitor update
978994 check_closed_broadcast ! ( nodes[ 0 ] , true ) ;
979- check_closed_event ! ( nodes[ 0 ] , 1 , ClosureReason :: ProcessingError { err: "ChannelMonitor storage failure" . to_string( ) } ,
995+ check_closed_event ! ( nodes[ 0 ] , 1 , ClosureReason :: ProcessingError { err: "ChannelMonitor storage failure" . to_string( ) } ,
980996 [ nodes[ 1 ] . node. get_our_node_id( ) ] , 100000 ) ;
981997
982998 // However, as the ChainMonitor is still waiting for the original persistence to complete,
0 commit comments