@@ -1130,9 +1130,34 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
11301130 }
11311131);
11321132
1133+ /// The source argument which is passed to [`ChannelManager::claim_mpp_part`].
1134+ ///
1135+ /// This is identical to [`MPPClaimHTLCSource`] except that [`Self::counterparty_node_id`] is an
1136+ /// `Option`, whereas it is required in [`MPPClaimHTLCSource`]. In the future, we should ideally
1137+ /// drop this and merge the two, however doing so may break upgrades for nodes which have pending
1138+ /// forwarded payments.
1139+ struct HTLCClaimSource {
1140+ counterparty_node_id: Option<PublicKey>,
1141+ funding_txo: OutPoint,
1142+ channel_id: ChannelId,
1143+ htlc_id: u64,
1144+ }
1145+
1146+ impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
1147+ fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
1148+ HTLCClaimSource {
1149+ counterparty_node_id: Some(o.counterparty_node_id),
1150+ funding_txo: o.funding_txo,
1151+ channel_id: o.channel_id,
1152+ htlc_id: o.htlc_id,
1153+ }
1154+ }
1155+ }
1156+
11331157#[derive(Clone, Debug, PartialEq, Eq)]
11341158/// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is
1135- /// tracked in [`PendingMPPClaim`].
1159+ /// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted
1160+ /// to an [`HTLCClaimSource`] for claim replays on startup.
11361161struct MPPClaimHTLCSource {
11371162 counterparty_node_id: PublicKey,
11381163 funding_txo: OutPoint,
@@ -6896,6 +6921,27 @@ where
68966921 >(
68976922 &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
68986923 payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6924+ ) {
6925+ let counterparty_node_id =
6926+ match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6927+ Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6928+ None => None
6929+ };
6930+
6931+ let htlc_source = HTLCClaimSource {
6932+ counterparty_node_id,
6933+ funding_txo: prev_hop.outpoint,
6934+ channel_id: prev_hop.channel_id,
6935+ htlc_id: prev_hop.htlc_id,
6936+ };
6937+ self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
6938+ }
6939+
6940+ fn claim_mpp_part<
6941+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6942+ >(
6943+ &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
6944+ payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
68996945 ) {
69006946 //TODO: Delay the claimed_funds relaying just like we do outbound relay!
69016947
@@ -6912,12 +6958,8 @@ where
69126958 {
69136959 let per_peer_state = self.per_peer_state.read().unwrap();
69146960 let chan_id = prev_hop.channel_id;
6915- let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6916- Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6917- None => None
6918- };
69196961
6920- let peer_state_opt = counterparty_node_id_opt .as_ref().map(
6962+ let peer_state_opt = prev_hop.counterparty_node_id .as_ref().map(
69216963 |counterparty_node_id| per_peer_state.get(counterparty_node_id)
69226964 .map(|peer_mutex| peer_mutex.lock().unwrap())
69236965 ).unwrap_or(None);
@@ -6944,7 +6986,7 @@ where
69446986 peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
69456987 }
69466988 if !during_init {
6947- handle_new_monitor_update!(self, prev_hop.outpoint , monitor_update, peer_state_lock,
6989+ handle_new_monitor_update!(self, prev_hop.funding_txo , monitor_update, peer_state_lock,
69486990 peer_state, per_peer_state, chan);
69496991 } else {
69506992 // If we're running during init we cannot update a monitor directly -
@@ -6953,7 +6995,7 @@ where
69536995 self.pending_background_events.lock().unwrap().push(
69546996 BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
69556997 counterparty_node_id,
6956- funding_txo: prev_hop.outpoint ,
6998+ funding_txo: prev_hop.funding_txo ,
69576999 channel_id: prev_hop.channel_id,
69587000 update: monitor_update.clone(),
69597001 });
@@ -7027,7 +7069,7 @@ where
70277069 }
70287070 let preimage_update = ChannelMonitorUpdate {
70297071 update_id: CLOSED_CHANNEL_UPDATE_ID,
7030- counterparty_node_id: None ,
7072+ counterparty_node_id: prev_hop.counterparty_node_id ,
70317073 updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
70327074 payment_preimage,
70337075 payment_info,
@@ -7038,7 +7080,7 @@ where
70387080 if !during_init {
70397081 // We update the ChannelMonitor on the backward link, after
70407082 // receiving an `update_fulfill_htlc` from the forward link.
7041- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint , &preimage_update);
7083+ let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo , &preimage_update);
70427084 if update_res != ChannelMonitorUpdateStatus::Completed {
70437085 // TODO: This needs to be handled somehow - if we receive a monitor update
70447086 // with a preimage we *must* somehow manage to propagate it to the upstream
@@ -7061,7 +7103,7 @@ where
70617103 // complete the monitor update completion action from `completion_action`.
70627104 self.pending_background_events.lock().unwrap().push(
70637105 BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
7064- prev_hop.outpoint , prev_hop.channel_id, preimage_update,
7106+ prev_hop.funding_txo , prev_hop.channel_id, preimage_update,
70657107 )));
70667108 }
70677109 // Note that we do process the completion action here. This totally could be a
@@ -7312,7 +7354,7 @@ where
73127354 onion_fields,
73137355 payment_id,
73147356 }) = payment {
7315- self.pending_events.lock().unwrap().push_back(( events::Event::PaymentClaimed {
7357+ let event = events::Event::PaymentClaimed {
73167358 payment_hash,
73177359 purpose,
73187360 amount_msat,
@@ -7321,7 +7363,16 @@ where
73217363 sender_intended_total_msat,
73227364 onion_fields,
73237365 payment_id,
7324- }, None));
7366+ };
7367+ let event_action = (event, None);
7368+ let mut pending_events = self.pending_events.lock().unwrap();
7369+ // If we're replaying a claim on startup we may end up duplicating an event
7370+ // that's already in our queue, so check before we push another one. The
7371+ // `payment_id` should suffice to ensure we never spuriously drop a second
7372+ // event for a duplicate payment.
7373+ if !pending_events.contains(&event_action) {
7374+ pending_events.push_back(event_action);
7375+ }
73257376 }
73267377 },
73277378 MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@@ -13130,67 +13181,126 @@ where
1313013181 };
1313113182
1313213183 for (_, monitor) in args.channel_monitors.iter() {
13133- for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
13134- let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13135- let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13136- let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13137- mem::drop(claimable_payments);
13138- if let Some(payment) = payment {
13139- log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13140- let mut claimable_amt_msat = 0;
13141- let mut receiver_node_id = Some(our_network_pubkey);
13142- let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13143- if phantom_shared_secret.is_some() {
13144- let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13145- .expect("Failed to get node_id for phantom node recipient");
13146- receiver_node_id = Some(phantom_pubkey)
13147- }
13148- for claimable_htlc in &payment.htlcs {
13149- claimable_amt_msat += claimable_htlc.value;
13150-
13151- // Add a holding-cell claim of the payment to the Channel, which should be
13152- // applied ~immediately on peer reconnection. Because it won't generate a
13153- // new commitment transaction we can just provide the payment preimage to
13154- // the corresponding ChannelMonitor and nothing else.
13155- //
13156- // We do so directly instead of via the normal ChannelMonitor update
13157- // procedure as the ChainMonitor hasn't yet been initialized, implying
13158- // we're not allowed to call it directly yet. Further, we do the update
13159- // without incrementing the ChannelMonitor update ID as there isn't any
13160- // reason to.
13161- // If we were to generate a new ChannelMonitor update ID here and then
13162- // crash before the user finishes block connect we'd end up force-closing
13163- // this channel as well. On the flip side, there's no harm in restarting
13164- // without the new monitor persisted - we'll end up right back here on
13165- // restart.
13166- let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13167- let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13168- .get(&claimable_htlc.prev_hop.outpoint).cloned();
13169- if let Some(peer_node_id) = peer_node_id_opt {
13170- let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13171- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13172- let peer_state = &mut *peer_state_lock;
13173- if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13174- let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13175- channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13184+ for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
13185+ if !payment_claims.is_empty() {
13186+ for payment_claim in payment_claims {
13187+ if payment_claim.mpp_parts.is_empty() {
13188+ return Err(DecodeError::InvalidValue);
13189+ }
13190+ let pending_claims = PendingMPPClaim {
13191+ channels_without_preimage: payment_claim.mpp_parts.clone(),
13192+ channels_with_preimage: Vec::new(),
13193+ };
13194+ let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
13195+
13196+ // While it may be duplicative to generate a PaymentClaimed here, trying to
13197+ // figure out if the user definitely saw it before shutdown would require some
13198+ // nontrivial logic and may break as we move away from regularly persisting
13199+ // ChannelManager. Instead, we rely on the users' event handler being
13200+ // idempotent and just blindly generate one no matter what, letting the
13201+ // preimages eventually timing out from ChannelMonitors to prevent us from
13202+ // doing so forever.
13203+
13204+ let claim_found =
13205+ channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
13206+ payment_hash, &channel_manager.node_signer, &channel_manager.logger,
13207+ &channel_manager.inbound_payment_id_secret, true,
13208+ );
13209+ if claim_found.is_err() {
13210+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13211+ match claimable_payments.pending_claiming_payments.entry(payment_hash) {
13212+ hash_map::Entry::Occupied(_) => {
13213+ debug_assert!(false, "Entry was added in begin_claiming_payment");
13214+ return Err(DecodeError::InvalidValue);
13215+ },
13216+ hash_map::Entry::Vacant(entry) => {
13217+ entry.insert(payment_claim.claiming_payment);
13218+ },
1317613219 }
1317713220 }
13178- if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13179- previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13221+
13222+ for part in payment_claim.mpp_parts.iter() {
13223+ let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
13224+ part.counterparty_node_id, part.channel_id, part.htlc_id,
13225+ PendingMPPClaimPointer(Arc::clone(&ptr))
13226+ ));
13227+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
13228+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
13229+ pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
13230+ }
13231+ );
13232+ // Note that we don't need to pass the `payment_info` here - its
13233+ // already (clearly) durably on disk in the `ChannelMonitor` so there's
13234+ // no need to worry about getting it into others.
13235+ channel_manager.claim_mpp_part(
13236+ part.into(), payment_preimage, None,
13237+ |_, _|
13238+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
13239+ );
1318013240 }
1318113241 }
13182- let mut pending_events = channel_manager.pending_events.lock().unwrap();
13183- let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13184- pending_events.push_back((events::Event::PaymentClaimed {
13185- receiver_node_id,
13186- payment_hash,
13187- purpose: payment.purpose,
13188- amount_msat: claimable_amt_msat,
13189- htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13190- sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13191- onion_fields: payment.onion_fields,
13192- payment_id: Some(payment_id),
13193- }, None));
13242+ } else {
13243+ let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13244+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13245+ let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13246+ mem::drop(claimable_payments);
13247+ if let Some(payment) = payment {
13248+ log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13249+ let mut claimable_amt_msat = 0;
13250+ let mut receiver_node_id = Some(our_network_pubkey);
13251+ let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13252+ if phantom_shared_secret.is_some() {
13253+ let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13254+ .expect("Failed to get node_id for phantom node recipient");
13255+ receiver_node_id = Some(phantom_pubkey)
13256+ }
13257+ for claimable_htlc in &payment.htlcs {
13258+ claimable_amt_msat += claimable_htlc.value;
13259+
13260+ // Add a holding-cell claim of the payment to the Channel, which should be
13261+ // applied ~immediately on peer reconnection. Because it won't generate a
13262+ // new commitment transaction we can just provide the payment preimage to
13263+ // the corresponding ChannelMonitor and nothing else.
13264+ //
13265+ // We do so directly instead of via the normal ChannelMonitor update
13266+ // procedure as the ChainMonitor hasn't yet been initialized, implying
13267+ // we're not allowed to call it directly yet. Further, we do the update
13268+ // without incrementing the ChannelMonitor update ID as there isn't any
13269+ // reason to.
13270+ // If we were to generate a new ChannelMonitor update ID here and then
13271+ // crash before the user finishes block connect we'd end up force-closing
13272+ // this channel as well. On the flip side, there's no harm in restarting
13273+ // without the new monitor persisted - we'll end up right back here on
13274+ // restart.
13275+ let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13276+ let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13277+ .get(&claimable_htlc.prev_hop.outpoint).cloned();
13278+ if let Some(peer_node_id) = peer_node_id_opt {
13279+ let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13280+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13281+ let peer_state = &mut *peer_state_lock;
13282+ if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13283+ let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13284+ channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13285+ }
13286+ }
13287+ if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13288+ previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13289+ }
13290+ }
13291+ let mut pending_events = channel_manager.pending_events.lock().unwrap();
13292+ let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13293+ pending_events.push_back((events::Event::PaymentClaimed {
13294+ receiver_node_id,
13295+ payment_hash,
13296+ purpose: payment.purpose,
13297+ amount_msat: claimable_amt_msat,
13298+ htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13299+ sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13300+ onion_fields: payment.onion_fields,
13301+ payment_id: Some(payment_id),
13302+ }, None));
13303+ }
1319413304 }
1319513305 }
1319613306 }
0 commit comments