@@ -799,7 +799,13 @@ pub(crate) enum MonitorUpdateCompletionAction {
799799 /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
800800 /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
801801 /// event can be generated.
802- PaymentClaimed { payment_hash: PaymentHash },
802+ PaymentClaimed {
803+ payment_hash: PaymentHash,
804+ /// A pending MPP claim which hasn't yet completed.
805+ ///
806+ /// Not written to disk.
807+ pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
808+ },
803809 /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
804810 /// operation of another channel.
805811 ///
@@ -833,7 +839,10 @@ pub(crate) enum MonitorUpdateCompletionAction {
833839}
834840
835841impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
836- (0, PaymentClaimed) => { (0, payment_hash, required) },
842+ (0, PaymentClaimed) => {
843+ (0, payment_hash, required),
844+ (9999999999, pending_mpp_claim, (static_value, None)),
845+ },
837846 // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
838847 // *immediately*. However, for simplicity we implement read/write here.
839848 (1, FreeOtherChannelImmediately) => {
@@ -6200,7 +6209,7 @@ where
62006209
62016210 let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
62026211
6203- let mut sources = {
6212+ let sources = {
62046213 let mut claimable_payments = self.claimable_payments.lock().unwrap();
62056214 if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
62066215 let mut receiver_node_id = self.our_network_pubkey;
@@ -6295,18 +6304,46 @@ where
62956304 return;
62966305 }
62976306 if valid_mpp {
6298- for htlc in sources.drain(..) {
6307+ let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
6308+ let channels_without_preimage = sources.iter().filter_map(|htlc| {
6309+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6310+ let prev_hop = &htlc.prev_hop;
6311+ Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
6312+ } else {
6313+ None
6314+ }
6315+ }).collect();
6316+ Some(Arc::new(Mutex::new(PendingMPPClaim {
6317+ channels_without_preimage,
6318+ channels_with_preimage: Vec::new(),
6319+ })))
6320+ } else {
6321+ None
6322+ };
6323+ for htlc in sources {
6324+ let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
6325+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6326+ let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
6327+ Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
6328+ } else {
6329+ None
6330+ }
6331+ );
6332+ let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
6333+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
6334+ pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
6335+ }
6336+ });
62996337 self.claim_funds_from_hop(
63006338 htlc.prev_hop, payment_preimage,
63016339 |_, definitely_duplicate| {
63026340 debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
6303- Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } )
6341+ ( Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker )
63046342 }
63056343 );
63066344 }
6307- }
6308- if !valid_mpp {
6309- for htlc in sources.drain(..) {
6345+ } else {
6346+ for htlc in sources {
63106347 let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
63116348 htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
63126349 let source = HTLCSource::PreviousHopData(htlc.prev_hop);
@@ -6324,7 +6361,9 @@ where
63246361 }
63256362 }
63266363
6327- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(
6364+ fn claim_funds_from_hop<
6365+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6366+ >(
63286367 &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
63296368 completion_action: ComplFunc,
63306369 ) {
@@ -6364,11 +6403,15 @@ where
63646403
63656404 match fulfill_res {
63666405 UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
6367- if let Some(action) = completion_action(Some(htlc_value_msat), false) {
6406+ let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
6407+ if let Some(action) = action_opt {
63686408 log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
63696409 chan_id, action);
63706410 peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
63716411 }
6412+ if let Some(raa_blocker) = raa_blocker_opt {
6413+ peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
6414+ }
63726415 if !during_init {
63736416 handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
63746417 peer_state, per_peer_state, chan);
@@ -6386,11 +6429,16 @@ where
63866429 }
63876430 }
63886431 UpdateFulfillCommitFetch::DuplicateClaim {} => {
6389- let action = if let Some(action) = completion_action(None, true) {
6432+ let (action_opt, raa_blocker_opt) = completion_action(None, true);
6433+ if let Some(raa_blocker) = raa_blocker_opt {
6434+ debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
6435+ }
6436+ let action = if let Some(action) = action_opt {
63906437 action
63916438 } else {
63926439 return;
63936440 };
6441+
63946442 mem::drop(peer_state_lock);
63956443
63966444 log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6477,7 +6525,46 @@ where
64776525 // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
64786526 // generally always allowed to be duplicative (and it's specifically noted in
64796527 // `PaymentForwarded`).
6480- self.handle_monitor_update_completion_actions(completion_action(None, false));
6528+ let (action_opt, raa_blocker_opt) = completion_action(None, false);
6529+
6530+ if let Some(raa_blocker) = raa_blocker_opt {
6531+ let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
6532+ // prev_hop.counterparty_node_id is always available for payments received after
6533+ // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
6534+ // look up the counterparty in the `action_opt`, if possible.
6535+ action_opt.as_ref().and_then(|action|
6536+ if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
6537+ pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
6538+ } else { None }
6539+ )
6540+ );
6541+ if let Some(counterparty_node_id) = counterparty_node_id {
6542+ // TODO: Avoid always blocking the world for the write lock here.
6543+ let mut per_peer_state = self.per_peer_state.write().unwrap();
6544+ let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
6545+ Mutex::new(PeerState {
6546+ channel_by_id: new_hash_map(),
6547+ inbound_channel_request_by_id: new_hash_map(),
6548+ latest_features: InitFeatures::empty(),
6549+ pending_msg_events: Vec::new(),
6550+ in_flight_monitor_updates: BTreeMap::new(),
6551+ monitor_update_blocked_actions: BTreeMap::new(),
6552+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
6553+ is_connected: false,
6554+ }));
6555+ let mut peer_state = peer_state_mutex.lock().unwrap();
6556+
6557+ peer_state.actions_blocking_raa_monitor_updates
6558+ .entry(prev_hop.channel_id)
6559+ .or_insert_with(Vec::new)
6560+ .push(raa_blocker);
6561+ } else {
6562+ debug_assert!(false,
6563+ "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
6564+ }
6565+ }
6566+
6567+ self.handle_monitor_update_completion_actions(action_opt);
64816568 }
64826569
64836570 fn finalize_claims(&self, sources: Vec<HTLCSource>) {
@@ -6576,16 +6663,16 @@ where
65766663 }
65776664 }), "{:?}", *background_events);
65786665 }
6579- None
6666+ ( None, None)
65806667 } else if definitely_duplicate {
65816668 if let Some(other_chan) = chan_to_release {
6582- Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6669+ ( Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
65836670 downstream_counterparty_node_id: other_chan.counterparty_node_id,
65846671 downstream_funding_outpoint: other_chan.funding_txo,
65856672 downstream_channel_id: other_chan.channel_id,
65866673 blocking_action: other_chan.blocking_action,
6587- })
6588- } else { None }
6674+ }), None)
6675+ } else { ( None, None) }
65896676 } else {
65906677 let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
65916678 if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6594,7 +6681,7 @@ where
65946681 } else { None };
65956682 debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
65966683 "skimmed_fee_msat must always be included in total_fee_earned_msat");
6597- Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
6684+ ( Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
65986685 event: events::Event::PaymentForwarded {
65996686 prev_channel_id: Some(prev_channel_id),
66006687 next_channel_id: Some(next_channel_id),
@@ -6606,7 +6693,7 @@ where
66066693 outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
66076694 },
66086695 downstream_counterparty_and_funding_outpoint: chan_to_release,
6609- })
6696+ }), None)
66106697 }
66116698 });
66126699 },
@@ -6623,9 +6710,44 @@ where
66236710 debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
66246711 debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
66256712
6713+ let mut freed_channels = Vec::new();
6714+
66266715 for action in actions.into_iter() {
66276716 match action {
6628- MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
6717+ MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
6718+ if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
6719+ let per_peer_state = self.per_peer_state.read().unwrap();
6720+ per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
6721+ let mut peer_state = peer_state_mutex.lock().unwrap();
6722+ let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
6723+ if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
6724+ blockers.get_mut().retain(|blocker|
6725+ if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
6726+ if *pending_claim == claim_ptr {
6727+ let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
6728+ let pending_claim_state = &mut *pending_claim_state_lock;
6729+ pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
6730+ if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
6731+ pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
6732+ false
6733+ } else { true }
6734+ });
6735+ if pending_claim_state.channels_without_preimage.is_empty() {
6736+ for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
6737+ freed_channels.push((*cp, *outp, *cid, blocker.clone()));
6738+ }
6739+ }
6740+ !pending_claim_state.channels_without_preimage.is_empty()
6741+ } else { true }
6742+ } else { true }
6743+ );
6744+ if blockers.get().is_empty() {
6745+ blockers.remove();
6746+ }
6747+ }
6748+ });
6749+ }
6750+
66296751 let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
66306752 if let Some(ClaimingPayment {
66316753 amount_msat,
@@ -6669,6 +6791,10 @@ where
66696791 },
66706792 }
66716793 }
6794+
6795+ for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
6796+ self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6797+ }
66726798 }
66736799
66746800 /// Handles a channel reentering a functional state, either due to reconnect or a monitor
0 commit comments