@@ -563,6 +563,7 @@ struct ClaimablePayments {
563563/// usually because we're running pre-full-init. They are handled immediately once we detect we are
564564/// running normally, and specifically must be processed before any other non-background
565565/// [`ChannelMonitorUpdate`]s are applied.
566+ #[derive(Debug)]
566567enum BackgroundEvent {
567568	/// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
568569	/// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
@@ -5377,8 +5378,11 @@ where
53775378			for htlc in sources.drain(..) {
53785379				if let Err((pk, err)) = self.claim_funds_from_hop(
53795380					htlc.prev_hop, payment_preimage,
5380- 					|_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
5381- 				{
5381+ 					|_, definitely_duplicate| {
5382+ 						debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
5383+ 						Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
5384+ 					}
5385+ 				) {
53825386					if let msgs::ErrorAction::IgnoreError = err.err.action {
53835387						// We got a temporary failure updating monitor, but will claim the
53845388						// HTLC when the monitor updating is restored (or on chain).
@@ -5406,7 +5410,7 @@ where
54065410		}
54075411	}
54085412
5409- 	fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
5413+ 	fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool ) -> Option<MonitorUpdateCompletionAction>>(&self,
54105414		prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
54115415	-> Result<(), (PublicKey, MsgHandleErrInternal)> {
54125416		//TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -5416,6 +5420,11 @@ where
54165420		// `BackgroundEvent`s.
54175421		let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
54185422
5423+ 		// As we may call handle_monitor_update_completion_actions in rather rare cases, check that
5424+ 		// the required mutexes are not held before we start.
5425+ 		debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5426+ 		debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5427+ 
54195428		{
54205429			let per_peer_state = self.per_peer_state.read().unwrap();
54215430			let chan_id = prev_hop.outpoint.to_channel_id();
@@ -5437,25 +5446,58 @@ where
54375446						let counterparty_node_id = chan.context.get_counterparty_node_id();
54385447						let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
54395448
5440- 						if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
5441- 							if let Some(action) = completion_action(Some(htlc_value_msat)) {
5442- 								log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5443- 									chan_id, action);
5444- 								peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5449+ 						match fulfill_res {
5450+ 							UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
5451+ 								if let Some(action) = completion_action(Some(htlc_value_msat), false) {
5452+ 									log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5453+ 										chan_id, action);
5454+ 									peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5455+ 								}
5456+ 								if !during_init {
5457+ 									handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5458+ 										peer_state, per_peer_state, chan);
5459+ 								} else {
5460+ 									// If we're running during init we cannot update a monitor directly -
5461+ 									// they probably haven't actually been loaded yet. Instead, push the
5462+ 									// monitor update as a background event.
5463+ 									self.pending_background_events.lock().unwrap().push(
5464+ 										BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5465+ 											counterparty_node_id,
5466+ 											funding_txo: prev_hop.outpoint,
5467+ 											update: monitor_update.clone(),
5468+ 										});
5469+ 								}
54455470							}
5446- 							if !during_init {
5447- 								handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5448- 									peer_state, per_peer_state, chan);
5449- 							} else {
5450- 								// If we're running during init we cannot update a monitor directly -
5451- 								// they probably haven't actually been loaded yet. Instead, push the
5452- 								// monitor update as a background event.
5453- 								self.pending_background_events.lock().unwrap().push(
5454- 									BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5455- 										counterparty_node_id,
5456- 										funding_txo: prev_hop.outpoint,
5457- 										update: monitor_update.clone(),
5458- 									});
5471+ 							UpdateFulfillCommitFetch::DuplicateClaim {} => {
5472+ 								if let Some(action) = completion_action(None, true) {
5473+ 									log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
5474+ 										chan_id, action);
5475+ 									mem::drop(peer_state_lock);
5476+ 									if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5477+ 										downstream_counterparty_and_funding_outpoint
5478+ 									} = action {
5479+ 										let (node_id, funding_outpoint, blocker) =
5480+ 											downstream_counterparty_and_funding_outpoint;
5481+ 										if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
5482+ 											let mut peer_state = peer_state_mtx.lock().unwrap();
5483+ 											if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
5484+ 												.get_mut(&funding_outpoint.to_channel_id())
5485+ 											{
5486+ 												let mut found_blocker = false;
5487+ 												blockers.retain(|iter| {
5488+ 													if *iter == blocker { found_blocker = true; }
5489+ 													*iter != blocker
5490+ 												});
5491+ 												debug_assert!(found_blocker);
5492+ 											}
5493+ 										} else {
5494+ 											debug_assert!(false);
5495+ 										}
5496+ 									} else {
5497+ 										debug_assert!(false,
5498+ 											"Duplicate claims should always free another channel immediately");
5499+ 									}
5500+ 								}
54595501							}
54605502						}
54615503					}
@@ -5503,7 +5545,7 @@ where
55035545		// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
55045546		// generally always allowed to be duplicative (and it's specifically noted in
55055547		// `PaymentForwarded`).
5506- 		self.handle_monitor_update_completion_actions(completion_action(None));
5548+ 		self.handle_monitor_update_completion_actions(completion_action(None, false ));
55075549		Ok(())
55085550	}
55095551
@@ -5533,33 +5575,72 @@ where
55335575			HTLCSource::PreviousHopData(hop_data) => {
55345576				let prev_outpoint = hop_data.outpoint;
55355577				let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
5578+ 				#[cfg(debug_assertions)]
5579+ 				let claiming_chan_funding_outpoint = hop_data.outpoint;
55365580				let res = self.claim_funds_from_hop(hop_data, payment_preimage,
5537- 					|htlc_claim_value_msat| {
5581+ 					|htlc_claim_value_msat, definitely_duplicate | {
55385582						if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
55395583							let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
55405584								Some(claimed_htlc_value - forwarded_htlc_value)
55415585							} else { None };
55425586
5543- 							Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
5544- 								event: events::Event::PaymentForwarded {
5545- 									fee_earned_msat,
5546- 									claim_from_onchain_tx: from_onchain,
5547- 									prev_channel_id: Some(prev_outpoint.to_channel_id()),
5548- 									next_channel_id: Some(next_channel_outpoint.to_channel_id()),
5549- 									outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
5550- 								},
5551- 								downstream_counterparty_and_funding_outpoint:
5552- 									if let Some(node_id) = next_channel_counterparty_node_id {
5553- 										Some((node_id, next_channel_outpoint, completed_blocker))
5554- 									} else {
5555- 										// We can only get `None` here if we are processing a
5556- 										// `ChannelMonitor`-originated event, in which case we
5557- 										// don't care about ensuring we wake the downstream
5558- 										// channel's monitor updating - the channel is already
5559- 										// closed.
5560- 										None
5587+ 							let chan_to_release =
5588+ 								if let Some(node_id) = next_channel_counterparty_node_id {
5589+ 									Some((node_id, next_channel_outpoint, completed_blocker))
5590+ 								} else {
5591+ 									// We can only get `None` here if we are processing a
5592+ 									// `ChannelMonitor`-originated event, in which case we
5593+ 									// don't care about ensuring we wake the downstream
5594+ 									// channel's monitor updating - the channel is already
5595+ 									// closed.
5596+ 									None
5597+ 								};
5598+ 
5599+ 							if definitely_duplicate && startup_replay {
5600+ 								// On startup we may get get redundant claims which are related to
5601+ 								// monitor updates still in flight. In that case, we shouldn't
5602+ 								// immediately free, but instead let that monitor update complete
5603+ 								// in the background.
5604+ 								#[cfg(debug_assertions)] {
5605+ 									let background_events = self.pending_background_events.lock().unwrap();
5606+ 									// There should be a `BackgroundEvent` pending...
5607+ 									assert!(background_events.iter().any(|ev| {
5608+ 										match ev {
5609+ 											// to apply a monitor update that blocked channel,
5610+ 											BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5611+ 												funding_txo, ..
5612+ 											} => *funding_txo == claiming_chan_funding_outpoint,
5613+ 											// or the channel we'd unblock is already closed,
5614+ 											BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, ..))
5615+ 												=> *funding_txo == next_channel_outpoint,
5616+ 											// or the monitor update has completed and will unblock
5617+ 											// immediately once we get going.
5618+ 											BackgroundEvent::MonitorUpdatesComplete {
5619+ 												channel_id, ..
5620+ 											} =>
5621+ 												*channel_id == claiming_chan_funding_outpoint.to_channel_id(),
5622+ 										}
5623+ 									}), "{:?}", *background_events);
5624+ 								}
5625+ 								None
5626+ 							} else if definitely_duplicate {
5627+ 								if let Some(other_chan) = chan_to_release {
5628+ 									Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5629+ 										downstream_counterparty_and_funding_outpoint: other_chan,
5630+ 									})
5631+ 								} else { None }
5632+ 							} else {
5633+ 								Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
5634+ 									event: events::Event::PaymentForwarded {
5635+ 										fee_earned_msat,
5636+ 										claim_from_onchain_tx: from_onchain,
5637+ 										prev_channel_id: Some(prev_outpoint.to_channel_id()),
5638+ 										next_channel_id: Some(next_channel_outpoint.to_channel_id()),
5639+ 										outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
55615640									},
5562- 							})
5641+ 									downstream_counterparty_and_funding_outpoint: chan_to_release,
5642+ 								})
5643+ 							}
55635644						} else { None }
55645645					});
55655646				if let Err((pk, err)) = res {
@@ -5576,6 +5657,10 @@ where
55765657	}
55775658
55785659	fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
5660+ 		debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5661+ 		debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5662+ 		debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
5663+ 
55795664		for action in actions.into_iter() {
55805665			match action {
55815666				MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
0 commit comments