@@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;
4242
4343use  crate :: prelude:: * ; 
4444use  crate :: sync:: { RwLock ,  RwLockReadGuard ,  Mutex ,  MutexGuard } ; 
45+ use  core:: iter:: FromIterator ; 
4546use  core:: ops:: Deref ; 
4647use  core:: sync:: atomic:: { AtomicBool ,  AtomicUsize ,  Ordering } ; 
4748use  bitcoin:: secp256k1:: PublicKey ; 
@@ -285,7 +286,22 @@ where C::Target: chain::Filter,
285286	where 
286287		FN :  Fn ( & ChannelMonitor < ChannelSigner > ,  & TransactionData )  -> Vec < TransactionOutputs > 
287288	{ 
289+ 		let  funding_outpoints:  HashSet < OutPoint >  = HashSet :: from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ; 
290+ 		for  funding_outpoint in  funding_outpoints. iter ( )  { 
291+ 			let  monitor_lock = self . monitors . read ( ) . unwrap ( ) ; 
292+ 			if  let  Some ( monitor_state)  = monitor_lock. get ( funding_outpoint)  { 
293+ 				self . update_monitor_with_chain_data ( header,  best_height,  txdata,  & process,  funding_outpoint,  & monitor_state) ; 
294+ 			} 
295+ 		} 
296+ 
297+ 		// do some followup cleanup if any funding outpoints were added in between iterations 
288298		let  monitor_states = self . monitors . write ( ) . unwrap ( ) ; 
299+ 		for  ( funding_outpoint,  monitor_state)  in  monitor_states. iter ( )  { 
300+ 			if  !funding_outpoints. contains ( funding_outpoint)  { 
301+ 				self . update_monitor_with_chain_data ( header,  best_height,  txdata,  & process,  funding_outpoint,  & monitor_state) ; 
302+ 			} 
303+ 		} 
304+ 
289305		if  let  Some ( height)  = best_height { 
290306			// If the best block height is being updated, update highest_chain_height under the 
291307			// monitors write lock. 
@@ -295,55 +311,55 @@ where C::Target: chain::Filter,
295311				self . highest_chain_height . store ( new_height,  Ordering :: Release ) ; 
296312			} 
297313		} 
314+ 	} 
298315
299- 		for  ( funding_outpoint,  monitor_state)  in  monitor_states. iter ( )  { 
300- 			let  monitor = & monitor_state. monitor ; 
301- 			let  mut  txn_outputs; 
302- 			{ 
303- 				txn_outputs = process ( monitor,  txdata) ; 
304- 				let  update_id = MonitorUpdateId  { 
305- 					contents :  UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) , 
306- 				} ; 
307- 				let  mut  pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ; 
308- 				if  let  Some ( height)  = best_height { 
309- 					if  !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates)  { 
310- 						// If there are not ChainSync persists awaiting completion, go ahead and 
311- 						// set last_chain_persist_height here - we wouldn't want the first 
312- 						// InProgress to always immediately be considered "overly delayed". 
313- 						monitor_state. last_chain_persist_height . store ( height as  usize ,  Ordering :: Release ) ; 
314- 					} 
316+ 	fn  update_monitor_with_chain_data < FN > ( & self ,  header :  & BlockHeader ,  best_height :  Option < u32 > ,  txdata :  & TransactionData ,  process :  FN ,  funding_outpoint :  & OutPoint ,  monitor_state :  & & MonitorHolder < ChannelSigner > )  where  FN :  Fn ( & ChannelMonitor < ChannelSigner > ,  & TransactionData )  -> Vec < TransactionOutputs >  { 
317+ 		let  monitor = & monitor_state. monitor ; 
318+ 		let  mut  txn_outputs; 
319+ 		{ 
320+ 			txn_outputs = process ( monitor,  txdata) ; 
321+ 			let  update_id = MonitorUpdateId  { 
322+ 				contents :  UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) , 
323+ 			} ; 
324+ 			let  mut  pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ; 
325+ 			if  let  Some ( height)  = best_height { 
326+ 				if  !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates)  { 
327+ 					// If there are not ChainSync persists awaiting completion, go ahead and 
328+ 					// set last_chain_persist_height here - we wouldn't want the first 
329+ 					// InProgress to always immediately be considered "overly delayed". 
330+ 					monitor_state. last_chain_persist_height . store ( height as  usize ,  Ordering :: Release ) ; 
315331				} 
332+ 			} 
316333
317- 				log_trace ! ( self . logger,  "Syncing Channel Monitor for channel {}" ,  log_funding_info!( monitor) ) ; 
318- 				match  self . persister . update_persisted_channel ( * funding_outpoint,  None ,  monitor,  update_id)  { 
319- 					ChannelMonitorUpdateStatus :: Completed  =>
320- 						log_trace ! ( self . logger,  "Finished syncing Channel Monitor for channel {}" ,  log_funding_info!( monitor) ) , 
321- 					ChannelMonitorUpdateStatus :: PermanentFailure  => { 
322- 						monitor_state. channel_perm_failed . store ( true ,  Ordering :: Release ) ; 
323- 						self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint,  vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] ,  monitor. get_counterparty_node_id ( ) ) ) ; 
324- 						self . event_notifier . notify ( ) ; 
325- 					} , 
326- 					ChannelMonitorUpdateStatus :: InProgress  => { 
327- 						log_debug ! ( self . logger,  "Channel Monitor sync for channel {} in progress, holding events until completion!" ,  log_funding_info!( monitor) ) ; 
328- 						pending_monitor_updates. push ( update_id) ; 
329- 					} , 
334+ 			log_trace ! ( self . logger,  "Syncing Channel Monitor for channel {}" ,  log_funding_info!( monitor) ) ; 
335+ 			match  self . persister . update_persisted_channel ( * funding_outpoint,  None ,  monitor,  update_id)  { 
336+ 				ChannelMonitorUpdateStatus :: Completed  =>
337+ 					log_trace ! ( self . logger,  "Finished syncing Channel Monitor for channel {}" ,  log_funding_info!( monitor) ) , 
338+ 				ChannelMonitorUpdateStatus :: PermanentFailure  => { 
339+ 					monitor_state. channel_perm_failed . store ( true ,  Ordering :: Release ) ; 
340+ 					self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint,  vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] ,  monitor. get_counterparty_node_id ( ) ) ) ; 
341+ 					self . event_notifier . notify ( ) ; 
342+ 				} 
343+ 				ChannelMonitorUpdateStatus :: InProgress  => { 
344+ 					log_debug ! ( self . logger,  "Channel Monitor sync for channel {} in progress, holding events until completion!" ,  log_funding_info!( monitor) ) ; 
345+ 					pending_monitor_updates. push ( update_id) ; 
330346				} 
331347			} 
348+ 		} 
332349
333- 			// Register any new outputs with the chain source for filtering, storing any dependent 
334- 			// transactions from within the block that previously had not been included in txdata. 
335- 			if  let  Some ( ref  chain_source)  = self . chain_source  { 
336- 				let  block_hash = header. block_hash ( ) ; 
337- 				for  ( txid,  mut  outputs)  in  txn_outputs. drain ( ..)  { 
338- 					for  ( idx,  output)  in  outputs. drain ( ..)  { 
339- 						// Register any new outputs with the chain source for filtering 
340- 						let  output = WatchedOutput  { 
341- 							block_hash :  Some ( block_hash) , 
342- 							outpoint :  OutPoint  {  txid,  index :  idx as  u16  } , 
343- 							script_pubkey :  output. script_pubkey , 
344- 						} ; 
345- 						chain_source. register_output ( output) 
346- 					} 
350+ 		// Register any new outputs with the chain source for filtering, storing any dependent 
351+ 		// transactions from within the block that previously had not been included in txdata. 
352+ 		if  let  Some ( ref  chain_source)  = self . chain_source  { 
353+ 			let  block_hash = header. block_hash ( ) ; 
354+ 			for  ( txid,  mut  outputs)  in  txn_outputs. drain ( ..)  { 
355+ 				for  ( idx,  output)  in  outputs. drain ( ..)  { 
356+ 					// Register any new outputs with the chain source for filtering 
357+ 					let  output = WatchedOutput  { 
358+ 						block_hash :  Some ( block_hash) , 
359+ 						outpoint :  OutPoint  {  txid,  index :  idx as  u16  } , 
360+ 						script_pubkey :  output. script_pubkey , 
361+ 					} ; 
362+ 					chain_source. register_output ( output) 
347363				} 
348364			} 
349365		} 
@@ -976,7 +992,7 @@ mod tests {
976992			assert!( err. contains( "ChannelMonitor storage failure" ) ) ) ; 
977993		check_added_monitors ! ( nodes[ 0 ] ,  2 ) ;  // After the failure we generate a close-channel monitor update 
978994		check_closed_broadcast ! ( nodes[ 0 ] ,  true ) ; 
979- 		check_closed_event ! ( nodes[ 0 ] ,  1 ,  ClosureReason :: ProcessingError  {  err:  "ChannelMonitor storage failure" . to_string( )  } ,   
995+ 		check_closed_event ! ( nodes[ 0 ] ,  1 ,  ClosureReason :: ProcessingError  {  err:  "ChannelMonitor storage failure" . to_string( )  } , 
980996			[ nodes[ 1 ] . node. get_our_node_id( ) ] ,  100000 ) ; 
981997
982998		// However, as the ChainMonitor is still waiting for the original persistence to complete, 
0 commit comments