diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7d698a22067..6051f00b90a 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; +use core::iter::FromIterator; use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; @@ -285,7 +286,22 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); + for funding_outpoint in funding_outpoints.iter() { + let monitor_lock = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { + self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + } + } + + // do some followup cleanup if any funding outpoints were added in between iterations let monitor_states = self.monitors.write().unwrap(); + for (funding_outpoint, monitor_state) in monitor_states.iter() { + if !funding_outpoints.contains(funding_outpoint) { + self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + } + } + if let Some(height) = best_height { // If the best block height is being updated, update highest_chain_height under the // monitors write lock. @@ -295,55 +311,55 @@ where C::Target: chain::Filter, self.highest_chain_height.store(new_height, Ordering::Release); } } + } - for (funding_outpoint, monitor_state) in monitor_states.iter() { - let monitor = &monitor_state.monitor; - let mut txn_outputs; - { - txn_outputs = process(monitor, txdata); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), - }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // InProgress to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } + fn update_monitor_with_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // InProgress to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } + } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { - ChannelMonitorUpdateStatus::Completed => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); - self.event_notifier.notify(); - }, - ChannelMonitorUpdateStatus::InProgress => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { + ChannelMonitorUpdateStatus::Completed => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + ChannelMonitorUpdateStatus::PermanentFailure => { + monitor_state.channel_perm_failed.store(true, Ordering::Release); + self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + self.event_notifier.notify(); + } + ChannelMonitorUpdateStatus::InProgress => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); } } + } - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - chain_source.register_output(output) - } + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + chain_source.register_output(output) } } } @@ -976,7 +992,7 @@ mod tests { assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update check_closed_broadcast!(nodes[0], true); - check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }, + check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }, [nodes[1].node.get_our_node_id()], 100000); // However, as the ChainMonitor is still waiting for the original persistence to complete, diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 728752f97c3..58878a9998c 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -2191,7 +2191,7 @@ fn test_anchors_aggregated_revoked_htlc_tx() { // Alice should see that Bob is trying to claim to HTLCs, so she should now try to claim them at // the second level instead. - let revoked_claims = { + let revoked_claim_transactions = { let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); assert_eq!(txn.len(), 2); @@ -2205,10 +2205,14 @@ fn test_anchors_aggregated_revoked_htlc_tx() { check_spends!(revoked_htlc_claim, htlc_tx); } - txn + let mut revoked_claim_transaction_map = HashMap::new(); + for current_tx in txn.into_iter() { + revoked_claim_transaction_map.insert(current_tx.txid(), current_tx); + } + revoked_claim_transaction_map }; for node in &nodes { - mine_transactions(node, &revoked_claims.iter().collect::>()); + mine_transactions(node, &revoked_claim_transactions.values().collect::>()); } @@ -2234,7 +2238,8 @@ fn test_anchors_aggregated_revoked_htlc_tx() { let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs( &[&outputs[0]], Vec::new(), Script::new_op_return(&[]), 253, None, &Secp256k1::new(), ).unwrap(); - check_spends!(spend_tx, revoked_claims[idx]); + + check_spends!(spend_tx, revoked_claim_transactions.get(&spend_tx.input[0].previous_output.txid).unwrap()); } else { panic!("unexpected event"); }