@@ -1125,6 +1125,24 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
11251125 }
11261126);
11271127
1128+ struct HTLCClaimSource {
1129+ counterparty_node_id: Option<PublicKey>,
1130+ funding_txo: OutPoint,
1131+ channel_id: ChannelId,
1132+ htlc_id: u64,
1133+ }
1134+
1135+ impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
1136+ fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
1137+ HTLCClaimSource {
1138+ counterparty_node_id: Some(o.counterparty_node_id),
1139+ funding_txo: o.funding_txo,
1140+ channel_id: o.channel_id,
1141+ htlc_id: o.htlc_id,
1142+ }
1143+ }
1144+ }
1145+
11281146#[derive(Clone, Debug, PartialEq, Eq)]
11291147struct MPPClaimHTLCSource {
11301148 counterparty_node_id: PublicKey,
@@ -6898,6 +6916,27 @@ where
68986916 >(
68996917 &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
69006918 payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6919+ ) {
6920+ let counterparty_node_id =
6921+ match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6922+ Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6923+ None => None
6924+ };
6925+
6926+ let htlc_source = HTLCClaimSource {
6927+ counterparty_node_id,
6928+ funding_txo: prev_hop.outpoint,
6929+ channel_id: prev_hop.channel_id,
6930+ htlc_id: prev_hop.htlc_id,
6931+ };
6932+ self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
6933+ }
6934+
6935+ fn claim_mpp_part<
6936+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6937+ >(
6938+ &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
6939+ payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
69016940 ) {
69026941 //TODO: Delay the claimed_funds relaying just like we do outbound relay!
69036942
@@ -6914,12 +6953,8 @@ where
69146953 {
69156954 let per_peer_state = self.per_peer_state.read().unwrap();
69166955 let chan_id = prev_hop.channel_id;
6917- let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6918- Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6919- None => None
6920- };
69216956
6922- let peer_state_opt = counterparty_node_id_opt .as_ref().map(
6957+ let peer_state_opt = prev_hop.counterparty_node_id .as_ref().map(
69236958 |counterparty_node_id| per_peer_state.get(counterparty_node_id)
69246959 .map(|peer_mutex| peer_mutex.lock().unwrap())
69256960 ).unwrap_or(None);
@@ -6946,7 +6981,7 @@ where
69466981 peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
69476982 }
69486983 if !during_init {
6949- handle_new_monitor_update!(self, prev_hop.outpoint , monitor_update, peer_state_lock,
6984+ handle_new_monitor_update!(self, prev_hop.funding_txo , monitor_update, peer_state_lock,
69506985 peer_state, per_peer_state, chan);
69516986 } else {
69526987 // If we're running during init we cannot update a monitor directly -
@@ -6955,7 +6990,7 @@ where
69556990 self.pending_background_events.lock().unwrap().push(
69566991 BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
69576992 counterparty_node_id,
6958- funding_txo: prev_hop.outpoint ,
6993+ funding_txo: prev_hop.funding_txo ,
69596994 channel_id: prev_hop.channel_id,
69606995 update: monitor_update.clone(),
69616996 });
@@ -7029,7 +7064,7 @@ where
70297064 }
70307065 let preimage_update = ChannelMonitorUpdate {
70317066 update_id: CLOSED_CHANNEL_UPDATE_ID,
7032- counterparty_node_id: None ,
7067+ counterparty_node_id: prev_hop.counterparty_node_id ,
70337068 updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
70347069 payment_preimage,
70357070 payment_info,
@@ -7040,7 +7075,7 @@ where
70407075 if !during_init {
70417076 // We update the ChannelMonitor on the backward link, after
70427077 // receiving an `update_fulfill_htlc` from the forward link.
7043- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint , &preimage_update);
7078+ let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo , &preimage_update);
70447079 if update_res != ChannelMonitorUpdateStatus::Completed {
70457080 // TODO: This needs to be handled somehow - if we receive a monitor update
70467081 // with a preimage we *must* somehow manage to propagate it to the upstream
@@ -7063,7 +7098,7 @@ where
70637098 // complete the monitor update completion action from `completion_action`.
70647099 self.pending_background_events.lock().unwrap().push(
70657100 BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
7066- prev_hop.outpoint , prev_hop.channel_id, preimage_update,
7101+ prev_hop.funding_txo , prev_hop.channel_id, preimage_update,
70677102 )));
70687103 }
70697104 // Note that we do process the completion action here. This totally could be a
@@ -7314,7 +7349,7 @@ where
73147349 onion_fields,
73157350 payment_id,
73167351 }) = payment {
7317- self.pending_events.lock().unwrap().push_back(( events::Event::PaymentClaimed {
7352+ let event = events::Event::PaymentClaimed {
73187353 payment_hash,
73197354 purpose,
73207355 amount_msat,
@@ -7323,7 +7358,16 @@ where
73237358 sender_intended_total_msat,
73247359 onion_fields,
73257360 payment_id,
7326- }, None));
7361+ };
7362+ let event_action = (event, None);
7363+ let mut pending_events = self.pending_events.lock().unwrap();
7364+ // If we're replaying a claim on startup we may end up duplicating an event
7365+ // that's already in our queue, so check before we push another one. The
7366+ // `payment_id` should suffice to ensure we never spuriously drop a second
7367+ // event for a duplicate payment.
7368+ if !pending_events.contains(&event_action) {
7369+ pending_events.push_back(event_action);
7370+ }
73277371 }
73287372 },
73297373 MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@@ -13132,67 +13176,126 @@ where
1313213176 };
1313313177
1313413178 for (_, monitor) in args.channel_monitors.iter() {
13135- for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
13136- let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13137- let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13138- let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13139- mem::drop(claimable_payments);
13140- if let Some(payment) = payment {
13141- log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13142- let mut claimable_amt_msat = 0;
13143- let mut receiver_node_id = Some(our_network_pubkey);
13144- let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13145- if phantom_shared_secret.is_some() {
13146- let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13147- .expect("Failed to get node_id for phantom node recipient");
13148- receiver_node_id = Some(phantom_pubkey)
13149- }
13150- for claimable_htlc in &payment.htlcs {
13151- claimable_amt_msat += claimable_htlc.value;
13152-
13153- // Add a holding-cell claim of the payment to the Channel, which should be
13154- // applied ~immediately on peer reconnection. Because it won't generate a
13155- // new commitment transaction we can just provide the payment preimage to
13156- // the corresponding ChannelMonitor and nothing else.
13157- //
13158- // We do so directly instead of via the normal ChannelMonitor update
13159- // procedure as the ChainMonitor hasn't yet been initialized, implying
13160- // we're not allowed to call it directly yet. Further, we do the update
13161- // without incrementing the ChannelMonitor update ID as there isn't any
13162- // reason to.
13163- // If we were to generate a new ChannelMonitor update ID here and then
13164- // crash before the user finishes block connect we'd end up force-closing
13165- // this channel as well. On the flip side, there's no harm in restarting
13166- // without the new monitor persisted - we'll end up right back here on
13167- // restart.
13168- let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13169- let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13170- .get(&claimable_htlc.prev_hop.outpoint).cloned();
13171- if let Some(peer_node_id) = peer_node_id_opt {
13172- let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13173- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13174- let peer_state = &mut *peer_state_lock;
13175- if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13176- let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13177- channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13179+ for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
13180+ if !payment_claims.is_empty() {
13181+ for payment_claim in payment_claims {
13182+ if payment_claim.mpp_parts.is_empty() {
13183+ return Err(DecodeError::InvalidValue);
13184+ }
13185+ let pending_claims = PendingMPPClaim {
13186+ channels_without_preimage: payment_claim.mpp_parts.clone(),
13187+ channels_with_preimage: Vec::new(),
13188+ };
13189+ let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
13190+
13191+ // While it may be duplicative to generate a PaymentClaimed here, trying to
13192+ // figure out if the user definitely saw it before shutdown would require some
13193+ // nontrivial logic and may break as we move away from regularly persisting
13194+ // ChannelManager. Instead, we rely on the users' event handler being
13195+ // idempotent and just blindly generate one no matter what, letting the
13196+ // preimages eventually timing out from ChannelMonitors to prevent us from
13197+ // doing so forever.
13198+
13199+ let claim_found =
13200+ channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
13201+ payment_hash, &channel_manager.node_signer, &channel_manager.logger,
13202+ &channel_manager.inbound_payment_id_secret, |_| Ok(()),
13203+ );
13204+ if claim_found.is_err() {
13205+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13206+ match claimable_payments.pending_claiming_payments.entry(payment_hash) {
13207+ hash_map::Entry::Occupied(_) => {
13208+ debug_assert!(false, "Entry was added in begin_claiming_payment");
13209+ return Err(DecodeError::InvalidValue);
13210+ },
13211+ hash_map::Entry::Vacant(entry) => {
13212+ entry.insert(payment_claim.claiming_payment);
13213+ },
1317813214 }
1317913215 }
13180- if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13181- previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13216+
13217+ for part in payment_claim.mpp_parts.iter() {
13218+ let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
13219+ part.counterparty_node_id, part.channel_id, part.htlc_id,
13220+ PendingMPPClaimPointer(Arc::clone(&ptr))
13221+ ));
13222+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
13223+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
13224+ pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
13225+ }
13226+ );
13227+ // Note that we don't need to pass the `payment_info` here - its
13228+ // already (clearly) durably on disk in the `ChannelMonitor` so there's
13229+ // no need to worry about getting it into others.
13230+ channel_manager.claim_mpp_part(
13231+ part.into(), payment_preimage, None,
13232+ |_, _|
13233+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
13234+ );
1318213235 }
1318313236 }
13184- let mut pending_events = channel_manager.pending_events.lock().unwrap();
13185- let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13186- pending_events.push_back((events::Event::PaymentClaimed {
13187- receiver_node_id,
13188- payment_hash,
13189- purpose: payment.purpose,
13190- amount_msat: claimable_amt_msat,
13191- htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13192- sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13193- onion_fields: payment.onion_fields,
13194- payment_id: Some(payment_id),
13195- }, None));
13237+ } else {
13238+ let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13239+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13240+ let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13241+ mem::drop(claimable_payments);
13242+ if let Some(payment) = payment {
13243+ log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13244+ let mut claimable_amt_msat = 0;
13245+ let mut receiver_node_id = Some(our_network_pubkey);
13246+ let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13247+ if phantom_shared_secret.is_some() {
13248+ let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13249+ .expect("Failed to get node_id for phantom node recipient");
13250+ receiver_node_id = Some(phantom_pubkey)
13251+ }
13252+ for claimable_htlc in &payment.htlcs {
13253+ claimable_amt_msat += claimable_htlc.value;
13254+
13255+ // Add a holding-cell claim of the payment to the Channel, which should be
13256+ // applied ~immediately on peer reconnection. Because it won't generate a
13257+ // new commitment transaction we can just provide the payment preimage to
13258+ // the corresponding ChannelMonitor and nothing else.
13259+ //
13260+ // We do so directly instead of via the normal ChannelMonitor update
13261+ // procedure as the ChainMonitor hasn't yet been initialized, implying
13262+ // we're not allowed to call it directly yet. Further, we do the update
13263+ // without incrementing the ChannelMonitor update ID as there isn't any
13264+ // reason to.
13265+ // If we were to generate a new ChannelMonitor update ID here and then
13266+ // crash before the user finishes block connect we'd end up force-closing
13267+ // this channel as well. On the flip side, there's no harm in restarting
13268+ // without the new monitor persisted - we'll end up right back here on
13269+ // restart.
13270+ let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13271+ let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13272+ .get(&claimable_htlc.prev_hop.outpoint).cloned();
13273+ if let Some(peer_node_id) = peer_node_id_opt {
13274+ let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13275+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13276+ let peer_state = &mut *peer_state_lock;
13277+ if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13278+ let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13279+ channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13280+ }
13281+ }
13282+ if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13283+ previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13284+ }
13285+ }
13286+ let mut pending_events = channel_manager.pending_events.lock().unwrap();
13287+ let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13288+ pending_events.push_back((events::Event::PaymentClaimed {
13289+ receiver_node_id,
13290+ payment_hash,
13291+ purpose: payment.purpose,
13292+ amount_msat: claimable_amt_msat,
13293+ htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13294+ sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13295+ onion_fields: payment.onion_fields,
13296+ payment_id: Some(payment_id),
13297+ }, None));
13298+ }
1319613299 }
1319713300 }
1319813301 }
0 commit comments