@@ -563,6 +563,7 @@ struct ClaimablePayments {
563563/// usually because we're running pre-full-init. They are handled immediately once we detect we are
564564/// running normally, and specifically must be processed before any other non-background
565565/// [`ChannelMonitorUpdate`]s are applied.
566+ #[derive(Debug)]
566567enum BackgroundEvent {
567568	/// Handle a ChannelMonitorUpdate which closes the channel or for an already-closed channel.
568569	/// This is only separated from [`Self::MonitorUpdateRegeneratedOnStartup`] as the
@@ -5381,8 +5382,11 @@ where
53815382			for htlc in sources.drain(..) {
53825383				if let Err((pk, err)) = self.claim_funds_from_hop(
53835384					htlc.prev_hop, payment_preimage,
5384- 					|_| Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }))
5385- 				{
5385+ 					|_, definitely_duplicate| {
5386+ 						debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
5387+ 						Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
5388+ 					}
5389+ 				) {
53865390					if let msgs::ErrorAction::IgnoreError = err.err.action {
53875391						// We got a temporary failure updating monitor, but will claim the
53885392						// HTLC when the monitor updating is restored (or on chain).
@@ -5410,7 +5414,7 @@ where
54105414		}
54115415	}
54125416
5413- 	fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>) -> Option<MonitorUpdateCompletionAction>>(&self,
5417+ 	fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool ) -> Option<MonitorUpdateCompletionAction>>(&self,
54145418		prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
54155419	-> Result<(), (PublicKey, MsgHandleErrInternal)> {
54165420		//TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -5420,6 +5424,11 @@ where
54205424		// `BackgroundEvent`s.
54215425		let during_init = !self.background_events_processed_since_startup.load(Ordering::Acquire);
54225426
5427+ 		// As we may call handle_monitor_update_completion_actions in rather rare cases, check that
5428+ 		// the required mutexes are not held before we start.
5429+ 		debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5430+ 		debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5431+ 
54235432		{
54245433			let per_peer_state = self.per_peer_state.read().unwrap();
54255434			let chan_id = prev_hop.outpoint.to_channel_id();
@@ -5441,25 +5450,70 @@ where
54415450						let counterparty_node_id = chan.context.get_counterparty_node_id();
54425451						let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, &self.logger);
54435452
5444- 						if let UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } = fulfill_res {
5445- 							if let Some(action) = completion_action(Some(htlc_value_msat)) {
5446- 								log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5447- 									chan_id, action);
5448- 								peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5453+ 						match fulfill_res {
5454+ 							UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
5455+ 								if let Some(action) = completion_action(Some(htlc_value_msat), false) {
5456+ 									log_trace!(self.logger, "Tracking monitor update completion action for channel {}: {:?}",
5457+ 										chan_id, action);
5458+ 									peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
5459+ 								}
5460+ 								if !during_init {
5461+ 									handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5462+ 										peer_state, per_peer_state, chan);
5463+ 								} else {
5464+ 									// If we're running during init we cannot update a monitor directly -
5465+ 									// they probably haven't actually been loaded yet. Instead, push the
5466+ 									// monitor update as a background event.
5467+ 									self.pending_background_events.lock().unwrap().push(
5468+ 										BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5469+ 											counterparty_node_id,
5470+ 											funding_txo: prev_hop.outpoint,
5471+ 											update: monitor_update.clone(),
5472+ 										});
5473+ 								}
54495474							}
5450- 							if !during_init {
5451- 								handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
5452- 									peer_state, per_peer_state, chan);
5453- 							} else {
5454- 								// If we're running during init we cannot update a monitor directly -
5455- 								// they probably haven't actually been loaded yet. Instead, push the
5456- 								// monitor update as a background event.
5457- 								self.pending_background_events.lock().unwrap().push(
5458- 									BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5459- 										counterparty_node_id,
5460- 										funding_txo: prev_hop.outpoint,
5461- 										update: monitor_update.clone(),
5462- 									});
5475+ 							UpdateFulfillCommitFetch::DuplicateClaim {} => {
5476+ 								let action = if let Some(action) = completion_action(None, true) {
5477+ 									action
5478+ 								} else {
5479+ 									return Ok(());
5480+ 								};
5481+ 								mem::drop(peer_state_lock);
5482+ 
5483+ 								log_trace!(self.logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
5484+ 									chan_id, action);
5485+ 								let (node_id, funding_outpoint, blocker) =
5486+ 								if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5487+ 									downstream_counterparty_node_id: node_id,
5488+ 									downstream_funding_outpoint: funding_outpoint,
5489+ 									blocking_action: blocker,
5490+ 								} = action {
5491+ 									(node_id, funding_outpoint, blocker)
5492+ 								} else {
5493+ 									debug_assert!(false,
5494+ 										"Duplicate claims should always free another channel immediately");
5495+ 									return Ok(());
5496+ 								};
5497+ 								if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
5498+ 									let mut peer_state = peer_state_mtx.lock().unwrap();
5499+ 									if let Some(blockers) = peer_state
5500+ 										.actions_blocking_raa_monitor_updates
5501+ 										.get_mut(&funding_outpoint.to_channel_id())
5502+ 									{
5503+ 										let mut found_blocker = false;
5504+ 										blockers.retain(|iter| {
5505+ 											// Note that we could actually be blocked, in
5506+ 											// which case we need to only remove the one
5507+ 											// blocker which was added duplicatively.
5508+ 											let first_blocker = !found_blocker;
5509+ 											if *iter == blocker { found_blocker = true; }
5510+ 											*iter != blocker || !first_blocker
5511+ 										});
5512+ 										debug_assert!(found_blocker);
5513+ 									}
5514+ 								} else {
5515+ 									debug_assert!(false);
5516+ 								}
54635517							}
54645518						}
54655519					}
@@ -5507,7 +5561,7 @@ where
55075561		// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
55085562		// generally always allowed to be duplicative (and it's specifically noted in
55095563		// `PaymentForwarded`).
5510- 		self.handle_monitor_update_completion_actions(completion_action(None));
5564+ 		self.handle_monitor_update_completion_actions(completion_action(None, false ));
55115565		Ok(())
55125566	}
55135567
@@ -5537,13 +5591,84 @@ where
55375591			HTLCSource::PreviousHopData(hop_data) => {
55385592				let prev_outpoint = hop_data.outpoint;
55395593				let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
5594+ 				#[cfg(debug_assertions)]
5595+ 				let claiming_chan_funding_outpoint = hop_data.outpoint;
55405596				let res = self.claim_funds_from_hop(hop_data, payment_preimage,
5541- 					|htlc_claim_value_msat| {
5542- 						if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
5543- 							let fee_earned_msat = if let Some(claimed_htlc_value) = htlc_claim_value_msat {
5544- 								Some(claimed_htlc_value - forwarded_htlc_value)
5545- 							} else { None };
5597+ 					|htlc_claim_value_msat, definitely_duplicate| {
5598+ 						let chan_to_release =
5599+ 							if let Some(node_id) = next_channel_counterparty_node_id {
5600+ 								Some((node_id, next_channel_outpoint, completed_blocker))
5601+ 							} else {
5602+ 								// We can only get `None` here if we are processing a
5603+ 								// `ChannelMonitor`-originated event, in which case we
5604+ 								// don't care about ensuring we wake the downstream
5605+ 								// channel's monitor updating - the channel is already
5606+ 								// closed.
5607+ 								None
5608+ 							};
55465609
5610+ 						if definitely_duplicate && startup_replay {
5611+ 							// On startup we may get redundant claims which are related to
5612+ 							// monitor updates still in flight. In that case, we shouldn't
5613+ 							// immediately free, but instead let that monitor update complete
5614+ 							// in the background.
5615+ 							#[cfg(debug_assertions)] {
5616+ 								let background_events = self.pending_background_events.lock().unwrap();
5617+ 								// There should be a `BackgroundEvent` pending...
5618+ 								assert!(background_events.iter().any(|ev| {
5619+ 									match ev {
5620+ 										// to apply a monitor update that blocked the claiming channel,
5621+ 										BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
5622+ 											funding_txo, update, ..
5623+ 										} => {
5624+ 											if *funding_txo == claiming_chan_funding_outpoint {
5625+ 												assert!(update.updates.iter().any(|upd|
5626+ 													if let ChannelMonitorUpdateStep::PaymentPreimage {
5627+ 														payment_preimage: update_preimage
5628+ 													} = upd {
5629+ 														payment_preimage == *update_preimage
5630+ 													} else { false }
5631+ 												), "{:?}", update);
5632+ 												true
5633+ 											} else { false }
5634+ 										},
5635+ 										// or the channel we'd unblock is already closed,
5636+ 										BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(
5637+ 											(funding_txo, monitor_update)
5638+ 										) => {
5639+ 											if *funding_txo == next_channel_outpoint {
5640+ 												assert_eq!(monitor_update.updates.len(), 1);
5641+ 												assert!(matches!(
5642+ 													monitor_update.updates[0],
5643+ 													ChannelMonitorUpdateStep::ChannelForceClosed { .. }
5644+ 												));
5645+ 												true
5646+ 											} else { false }
5647+ 										},
5648+ 										// or the monitor update has completed and will unblock
5649+ 										// immediately once we get going.
5650+ 										BackgroundEvent::MonitorUpdatesComplete {
5651+ 											channel_id, ..
5652+ 										} =>
5653+ 											*channel_id == claiming_chan_funding_outpoint.to_channel_id(),
5654+ 									}
5655+ 								}), "{:?}", *background_events);
5656+ 							}
5657+ 							None
5658+ 						} else if definitely_duplicate {
5659+ 							if let Some(other_chan) = chan_to_release {
5660+ 								Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
5661+ 									downstream_counterparty_node_id: other_chan.0,
5662+ 									downstream_funding_outpoint: other_chan.1,
5663+ 									blocking_action: other_chan.2,
5664+ 								})
5665+ 							} else { None }
5666+ 						} else {
5667+ 							let fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
5668+ 								if let Some(claimed_htlc_value) = htlc_claim_value_msat {
5669+ 									Some(claimed_htlc_value - forwarded_htlc_value)
5670+ 								} else { None }
5671+ 							} else { None };
55475672							Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
55485673								event: events::Event::PaymentForwarded {
55495674									fee_earned_msat,
@@ -5552,19 +5677,9 @@ where
55525677									next_channel_id: Some(next_channel_outpoint.to_channel_id()),
55535678									outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
55545679								},
5555- 								downstream_counterparty_and_funding_outpoint:
5556- 									if let Some(node_id) = next_channel_counterparty_node_id {
5557- 										Some((node_id, next_channel_outpoint, completed_blocker))
5558- 									} else {
5559- 										// We can only get `None` here if we are processing a
5560- 										// `ChannelMonitor`-originated event, in which case we
5561- 										// don't care about ensuring we wake the downstream
5562- 										// channel's monitor updating - the channel is already
5563- 										// closed.
5564- 										None
5565- 									},
5680+ 								downstream_counterparty_and_funding_outpoint: chan_to_release,
55665681							})
5567- 						} else { None } 
5682+ 						}
55685683					});
55695684				if let Err((pk, err)) = res {
55705685					let result: Result<(), _> = Err(err);
@@ -5580,6 +5695,10 @@ where
55805695	}
55815696
55825697	fn handle_monitor_update_completion_actions<I: IntoIterator<Item=MonitorUpdateCompletionAction>>(&self, actions: I) {
5698+ 		debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread);
5699+ 		debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
5700+ 		debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
5701+ 
55835702		for action in actions.into_iter() {
55845703			match action {
55855704				MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
0 commit comments