@@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;
4242
4343use crate :: prelude:: * ;
4444use crate :: sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
45+ use core:: iter:: FromIterator ;
4546use core:: ops:: Deref ;
4647use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
4748use bitcoin:: secp256k1:: PublicKey ;
@@ -286,7 +287,7 @@ where C::Target: chain::Filter,
286287 FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
287288 {
288289 { // create nested block to release write lock upon completion
289- let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
290+ let _monitor_states = self . monitors . write ( ) . unwrap ( ) ;
290291 if let Some ( height) = best_height {
291292 // If the best block height is being updated, update highest_chain_height under the
292293 // monitors write lock.
@@ -298,59 +299,71 @@ where C::Target: chain::Filter,
298299 }
299300 }
300301
301- let funding_outpoints: Vec < OutPoint > = self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) . collect ( ) ;
302+ let funding_outpoints: HashSet < OutPoint > = HashSet :: from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ;
302303 for funding_outpoint in funding_outpoints. iter ( ) {
303304 let monitor_lock = self . monitors . write ( ) . unwrap ( ) ;
304305 let monitor_state = monitor_lock. get ( funding_outpoint) ;
305306 if let Some ( monitor_state) = monitor_state {
306- let monitor = & monitor_state. monitor ;
307- let mut txn_outputs;
308- {
309- txn_outputs = process ( monitor, txdata) ;
310- let update_id = MonitorUpdateId {
311- contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
312- } ;
313- let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
314- if let Some ( height) = best_height {
315- if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
316- // If there are not ChainSync persists awaiting completion, go ahead and
317- // set last_chain_persist_height here - we wouldn't want the first
318- // InProgress to always immediately be considered "overly delayed".
319- monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
320- }
321- }
322-
323- log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
324- match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor, update_id) {
325- ChannelMonitorUpdateStatus :: Completed =>
326- log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
327- ChannelMonitorUpdateStatus :: PermanentFailure => {
328- monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
329- self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
330- self . event_notifier . notify ( ) ;
331- } ,
332- ChannelMonitorUpdateStatus :: InProgress => {
333- log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
334- pending_monitor_updates. push ( update_id) ;
335- } ,
336- }
307+ self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
308+ }
309+ }
310+
311+ // do some followup cleanup if any funding outpoints were added in between iterations
312+ let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
313+ for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
314+ if !funding_outpoints. contains ( funding_outpoint) {
315+ self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
316+ }
317+ }
318+ }
319+
320+ fn update_monitor_with_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint , monitor_state : & & MonitorHolder < ChannelSigner > ) where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
321+ let monitor = & monitor_state. monitor ;
322+ let mut txn_outputs;
323+ {
324+ txn_outputs = process ( monitor, txdata) ;
325+ let update_id = MonitorUpdateId {
326+ contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
327+ } ;
328+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
329+ if let Some ( height) = best_height {
330+ if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
331+ // If there are not ChainSync persists awaiting completion, go ahead and
332+ // set last_chain_persist_height here - we wouldn't want the first
333+ // InProgress to always immediately be considered "overly delayed".
334+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
337335 }
336+ }
338337
339- // Register any new outputs with the chain source for filtering, storing any dependent
340- // transactions from within the block that previously had not been included in txdata.
341- if let Some ( ref chain_source) = self . chain_source {
342- let block_hash = header. block_hash ( ) ;
343- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
344- for ( idx, output) in outputs. drain ( ..) {
345- // Register any new outputs with the chain source for filtering
346- let output = WatchedOutput {
347- block_hash : Some ( block_hash) ,
348- outpoint : OutPoint { txid, index : idx as u16 } ,
349- script_pubkey : output. script_pubkey ,
350- } ;
351- chain_source. register_output ( output)
352- }
353- }
338+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
339+ match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor, update_id) {
340+ ChannelMonitorUpdateStatus :: Completed =>
341+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
342+ ChannelMonitorUpdateStatus :: PermanentFailure => {
343+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
344+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
345+ self . event_notifier . notify ( ) ;
346+ }
347+ ChannelMonitorUpdateStatus :: InProgress => {
348+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
349+ pending_monitor_updates. push ( update_id) ;
350+ }
351+ }
352+ }
353+
354+ // Register any new outputs with the chain source for filtering, storing any dependent
355+ // transactions from within the block that previously had not been included in txdata.
356+ if let Some ( ref chain_source) = self . chain_source {
357+ let block_hash = header. block_hash ( ) ;
358+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
359+ for ( idx, output) in outputs. drain ( ..) {
360+ // Register any new outputs with the chain source for filtering
361+ let output = WatchedOutput {
362+ block_hash : Some ( block_hash) ,
363+ outpoint : OutPoint { txid, index : idx as u16 } ,
364+ script_pubkey : output. script_pubkey ,
365+ } ;
366+ chain_source. register_output ( output)
354367 }
355368 }
356369 }
0 commit comments