Skip to content

Commit 79f8c1f

Browse files
committed
Don't maintain long-term write lock while processing chain data.
1 parent 3dffe54 commit 79f8c1f

File tree

1 file changed

+59
-52
lines changed

1 file changed

+59
-52
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 59 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -285,64 +285,71 @@ where C::Target: chain::Filter,
285285
where
286286
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
287287
{
288-
let monitor_states = self.monitors.write().unwrap();
289-
if let Some(height) = best_height {
290-
// If the best block height is being updated, update highest_chain_height under the
291-
// monitors write lock.
292-
let old_height = self.highest_chain_height.load(Ordering::Acquire);
293-
let new_height = height as usize;
294-
if new_height > old_height {
295-
self.highest_chain_height.store(new_height, Ordering::Release);
288+
{ // create nested block to release write lock upon completion
289+
let monitor_states = self.monitors.write().unwrap();
290+
if let Some(height) = best_height {
291+
// If the best block height is being updated, update highest_chain_height under the
292+
// monitors write lock.
293+
let old_height = self.highest_chain_height.load(Ordering::Acquire);
294+
let new_height = height as usize;
295+
if new_height > old_height {
296+
self.highest_chain_height.store(new_height, Ordering::Release);
297+
}
296298
}
297299
}
298300

299-
for (funding_outpoint, monitor_state) in monitor_states.iter() {
300-
let monitor = &monitor_state.monitor;
301-
let mut txn_outputs;
302-
{
303-
txn_outputs = process(monitor, txdata);
304-
let update_id = MonitorUpdateId {
305-
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
306-
};
307-
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
308-
if let Some(height) = best_height {
309-
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
310-
// If there are not ChainSync persists awaiting completion, go ahead and
311-
// set last_chain_persist_height here - we wouldn't want the first
312-
// InProgress to always immediately be considered "overly delayed".
313-
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
301+
let funding_outpoints: Vec<OutPoint> = self.monitors.read().unwrap().keys().cloned().collect();
302+
for funding_outpoint in funding_outpoints.iter() {
303+
let monitor_lock = self.monitors.write().unwrap();
304+
let monitor_state = monitor_lock.get(funding_outpoint);
305+
if let Some(monitor_state) = monitor_state {
306+
let monitor = &monitor_state.monitor;
307+
let mut txn_outputs;
308+
{
309+
txn_outputs = process(monitor, txdata);
310+
let update_id = MonitorUpdateId {
311+
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
312+
};
313+
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
314+
if let Some(height) = best_height {
315+
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
316+
// If there are not ChainSync persists awaiting completion, go ahead and
317+
// set last_chain_persist_height here - we wouldn't want the first
318+
// InProgress to always immediately be considered "overly delayed".
319+
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
320+
}
314321
}
315-
}
316322

317-
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
318-
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
319-
ChannelMonitorUpdateStatus::Completed =>
320-
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
321-
ChannelMonitorUpdateStatus::PermanentFailure => {
322-
monitor_state.channel_perm_failed.store(true, Ordering::Release);
323-
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
324-
self.event_notifier.notify();
325-
},
326-
ChannelMonitorUpdateStatus::InProgress => {
327-
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
328-
pending_monitor_updates.push(update_id);
329-
},
323+
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
324+
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
325+
ChannelMonitorUpdateStatus::Completed =>
326+
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
327+
ChannelMonitorUpdateStatus::PermanentFailure => {
328+
monitor_state.channel_perm_failed.store(true, Ordering::Release);
329+
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
330+
self.event_notifier.notify();
331+
},
332+
ChannelMonitorUpdateStatus::InProgress => {
333+
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
334+
pending_monitor_updates.push(update_id);
335+
},
336+
}
330337
}
331-
}
332338

333-
// Register any new outputs with the chain source for filtering, storing any dependent
334-
// transactions from within the block that previously had not been included in txdata.
335-
if let Some(ref chain_source) = self.chain_source {
336-
let block_hash = header.block_hash();
337-
for (txid, mut outputs) in txn_outputs.drain(..) {
338-
for (idx, output) in outputs.drain(..) {
339-
// Register any new outputs with the chain source for filtering
340-
let output = WatchedOutput {
341-
block_hash: Some(block_hash),
342-
outpoint: OutPoint { txid, index: idx as u16 },
343-
script_pubkey: output.script_pubkey,
344-
};
345-
chain_source.register_output(output)
339+
// Register any new outputs with the chain source for filtering, storing any dependent
340+
// transactions from within the block that previously had not been included in txdata.
341+
if let Some(ref chain_source) = self.chain_source {
342+
let block_hash = header.block_hash();
343+
for (txid, mut outputs) in txn_outputs.drain(..) {
344+
for (idx, output) in outputs.drain(..) {
345+
// Register any new outputs with the chain source for filtering
346+
let output = WatchedOutput {
347+
block_hash: Some(block_hash),
348+
outpoint: OutPoint { txid, index: idx as u16 },
349+
script_pubkey: output.script_pubkey,
350+
};
351+
chain_source.register_output(output)
352+
}
346353
}
347354
}
348355
}
@@ -976,7 +983,7 @@ mod tests {
976983
assert!(err.contains("ChannelMonitor storage failure")));
977984
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
978985
check_closed_broadcast!(nodes[0], true);
979-
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
986+
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
980987
[nodes[1].node.get_our_node_id()], 100000);
981988

982989
// However, as the ChainMonitor is still waiting for the original persistence to complete,

0 commit comments

Comments
 (0)