@@ -1117,6 +1117,24 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
11171117 }
11181118);
11191119
1120+ struct HTLCClaimSource {
1121+ counterparty_node_id: Option<PublicKey>,
1122+ funding_txo: OutPoint,
1123+ channel_id: ChannelId,
1124+ htlc_id: u64,
1125+ }
1126+
1127+ impl From<&MPPClaimHTLCSource> for HTLCClaimSource {
1128+ fn from(o: &MPPClaimHTLCSource) -> HTLCClaimSource {
1129+ HTLCClaimSource {
1130+ counterparty_node_id: Some(o.counterparty_node_id),
1131+ funding_txo: o.funding_txo,
1132+ channel_id: o.channel_id,
1133+ htlc_id: o.htlc_id,
1134+ }
1135+ }
1136+ }
1137+
11201138#[derive(Clone, Debug, PartialEq, Eq)]
11211139struct MPPClaimHTLCSource {
11221140 counterparty_node_id: PublicKey,
@@ -6889,6 +6907,27 @@ where
68896907 >(
68906908 &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
68916909 payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
6910+ ) {
6911+ let counterparty_node_id =
6912+ match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6913+ Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6914+ None => None
6915+ };
6916+
6917+ let htlc_source = HTLCClaimSource {
6918+ counterparty_node_id,
6919+ funding_txo: prev_hop.outpoint,
6920+ channel_id: prev_hop.channel_id,
6921+ htlc_id: prev_hop.htlc_id,
6922+ };
6923+ self.claim_mpp_part(htlc_source, payment_preimage, payment_info, completion_action)
6924+ }
6925+
6926+ fn claim_mpp_part<
6927+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
6928+ >(
6929+ &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage,
6930+ payment_info: Option<PaymentClaimDetails>, completion_action: ComplFunc,
68926931 ) {
68936932 //TODO: Delay the claimed_funds relaying just like we do outbound relay!
68946933
@@ -6905,12 +6944,8 @@ where
69056944 {
69066945 let per_peer_state = self.per_peer_state.read().unwrap();
69076946 let chan_id = prev_hop.channel_id;
6908- let counterparty_node_id_opt = match self.short_to_chan_info.read().unwrap().get(&prev_hop.short_channel_id) {
6909- Some((cp_id, _dup_chan_id)) => Some(cp_id.clone()),
6910- None => None
6911- };
69126947
6913- let peer_state_opt = counterparty_node_id_opt .as_ref().map(
6948+ let peer_state_opt = prev_hop.counterparty_node_id .as_ref().map(
69146949 |counterparty_node_id| per_peer_state.get(counterparty_node_id)
69156950 .map(|peer_mutex| peer_mutex.lock().unwrap())
69166951 ).unwrap_or(None);
@@ -6937,7 +6972,7 @@ where
69376972 peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
69386973 }
69396974 if !during_init {
6940- handle_new_monitor_update!(self, prev_hop.outpoint , monitor_update, peer_state_lock,
6975+ handle_new_monitor_update!(self, prev_hop.funding_txo , monitor_update, peer_state_lock,
69416976 peer_state, per_peer_state, chan);
69426977 } else {
69436978 // If we're running during init we cannot update a monitor directly -
@@ -6946,7 +6981,7 @@ where
69466981 self.pending_background_events.lock().unwrap().push(
69476982 BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
69486983 counterparty_node_id,
6949- funding_txo: prev_hop.outpoint ,
6984+ funding_txo: prev_hop.funding_txo ,
69506985 channel_id: prev_hop.channel_id,
69516986 update: monitor_update.clone(),
69526987 });
@@ -7020,7 +7055,7 @@ where
70207055 }
70217056 let preimage_update = ChannelMonitorUpdate {
70227057 update_id: CLOSED_CHANNEL_UPDATE_ID,
7023- counterparty_node_id: None ,
7058+ counterparty_node_id: prev_hop.counterparty_node_id ,
70247059 updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
70257060 payment_preimage,
70267061 payment_info,
@@ -7031,7 +7066,7 @@ where
70317066 if !during_init {
70327067 // We update the ChannelMonitor on the backward link, after
70337068 // receiving an `update_fulfill_htlc` from the forward link.
7034- let update_res = self.chain_monitor.update_channel(prev_hop.outpoint , &preimage_update);
7069+ let update_res = self.chain_monitor.update_channel(prev_hop.funding_txo , &preimage_update);
70357070 if update_res != ChannelMonitorUpdateStatus::Completed {
70367071 // TODO: This needs to be handled somehow - if we receive a monitor update
70377072 // with a preimage we *must* somehow manage to propagate it to the upstream
@@ -7054,7 +7089,7 @@ where
70547089 // complete the monitor update completion action from `completion_action`.
70557090 self.pending_background_events.lock().unwrap().push(
70567091 BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
7057- prev_hop.outpoint , prev_hop.channel_id, preimage_update,
7092+ prev_hop.funding_txo , prev_hop.channel_id, preimage_update,
70587093 )));
70597094 }
70607095 // Note that we do process the completion action here. This totally could be a
@@ -7305,7 +7340,7 @@ where
73057340 onion_fields,
73067341 payment_id,
73077342 }) = payment {
7308- self.pending_events.lock().unwrap().push_back(( events::Event::PaymentClaimed {
7343+ let event = events::Event::PaymentClaimed {
73097344 payment_hash,
73107345 purpose,
73117346 amount_msat,
@@ -7314,7 +7349,16 @@ where
73147349 sender_intended_total_msat,
73157350 onion_fields,
73167351 payment_id,
7317- }, None));
7352+ };
7353+ let event_action = (event, None);
7354+ let mut pending_events = self.pending_events.lock().unwrap();
7355+ // If we're replaying a claim on startup we may end up duplicating an event
7356+ // that's already in our queue, so check before we push another one. The
7357+ // `payment_id` should suffice to ensure we never spuriously drop a second
7358+ // event for a duplicate payment.
7359+ if !pending_events.contains(&event_action) {
7360+ pending_events.push_back(event_action);
7361+ }
73187362 }
73197363 },
73207364 MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
@@ -13121,67 +13165,126 @@ where
1312113165 };
1312213166
1312313167 for (_, monitor) in args.channel_monitors.iter() {
13124- for (payment_hash, (payment_preimage, _)) in monitor.get_stored_preimages() {
13125- let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13126- let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13127- let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13128- mem::drop(claimable_payments);
13129- if let Some(payment) = payment {
13130- log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13131- let mut claimable_amt_msat = 0;
13132- let mut receiver_node_id = Some(our_network_pubkey);
13133- let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13134- if phantom_shared_secret.is_some() {
13135- let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13136- .expect("Failed to get node_id for phantom node recipient");
13137- receiver_node_id = Some(phantom_pubkey)
13138- }
13139- for claimable_htlc in &payment.htlcs {
13140- claimable_amt_msat += claimable_htlc.value;
13141-
13142- // Add a holding-cell claim of the payment to the Channel, which should be
13143- // applied ~immediately on peer reconnection. Because it won't generate a
13144- // new commitment transaction we can just provide the payment preimage to
13145- // the corresponding ChannelMonitor and nothing else.
13146- //
13147- // We do so directly instead of via the normal ChannelMonitor update
13148- // procedure as the ChainMonitor hasn't yet been initialized, implying
13149- // we're not allowed to call it directly yet. Further, we do the update
13150- // without incrementing the ChannelMonitor update ID as there isn't any
13151- // reason to.
13152- // If we were to generate a new ChannelMonitor update ID here and then
13153- // crash before the user finishes block connect we'd end up force-closing
13154- // this channel as well. On the flip side, there's no harm in restarting
13155- // without the new monitor persisted - we'll end up right back here on
13156- // restart.
13157- let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13158- let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13159- .get(&claimable_htlc.prev_hop.outpoint).cloned();
13160- if let Some(peer_node_id) = peer_node_id_opt {
13161- let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13162- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13163- let peer_state = &mut *peer_state_lock;
13164- if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13165- let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13166- channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13168+ for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() {
13169+ if !payment_claims.is_empty() {
13170+ for payment_claim in payment_claims {
13171+ if payment_claim.mpp_parts.is_empty() {
13172+ return Err(DecodeError::InvalidValue);
13173+ }
13174+ let pending_claims = PendingMPPClaim {
13175+ channels_without_preimage: payment_claim.mpp_parts.clone(),
13176+ channels_with_preimage: Vec::new(),
13177+ };
13178+ let pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
13179+
13180+ // While it may be duplicative to generate a PaymentClaimed here, trying to
13181+ // figure out if the user definitely saw it before shutdown would require some
13182+ // nontrivial logic and may break as we move away from regularly persisting
13183+ // ChannelManager. Instead, we rely on the users' event handler being
13184+ // idempotent and just blindly generate one no matter what, letting the
13185+ // preimages eventually timing out from ChannelMonitors to prevent us from
13186+ // doing so forever.
13187+
13188+ let claim_found =
13189+ channel_manager.claimable_payments.lock().unwrap().begin_claiming_payment(
13190+ payment_hash, &channel_manager.node_signer, &channel_manager.logger,
13191+ &channel_manager.inbound_payment_id_secret, |_| Ok(()),
13192+ );
13193+ if claim_found.is_err() {
13194+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13195+ match claimable_payments.pending_claiming_payments.entry(payment_hash) {
13196+ hash_map::Entry::Occupied(_) => {
13197+ debug_assert!(false, "unreachable");
13198+ return Err(DecodeError::InvalidValue);
13199+ },
13200+ hash_map::Entry::Vacant(entry) => {
13201+ entry.insert(payment_claim.claiming_payment);
13202+ },
1316713203 }
1316813204 }
13169- if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13170- previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13205+
13206+ for part in payment_claim.mpp_parts.iter() {
13207+ let pending_mpp_claim = pending_claim_ptr_opt.as_ref().map(|ptr| (
13208+ part.counterparty_node_id, part.channel_id, part.htlc_id,
13209+ PendingMPPClaimPointer(Arc::clone(&ptr))
13210+ ));
13211+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|ptr|
13212+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
13213+ pending_claim: PendingMPPClaimPointer(Arc::clone(&ptr)),
13214+ }
13215+ );
13216+ // Note that we don't need to pass the `payment_info` here - its
13217+ // already (clearly) durably on disk in on `ChannelMonitor` so there's
13218+ // no need to worry about getting it into others.
13219+ channel_manager.claim_mpp_part(
13220+ part.into(), payment_preimage, None,
13221+ |_, _|
13222+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr)
13223+ );
1317113224 }
1317213225 }
13173- let mut pending_events = channel_manager.pending_events.lock().unwrap();
13174- let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13175- pending_events.push_back((events::Event::PaymentClaimed {
13176- receiver_node_id,
13177- payment_hash,
13178- purpose: payment.purpose,
13179- amount_msat: claimable_amt_msat,
13180- htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13181- sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13182- onion_fields: payment.onion_fields,
13183- payment_id: Some(payment_id),
13184- }, None));
13226+ } else {
13227+ let per_peer_state = channel_manager.per_peer_state.read().unwrap();
13228+ let mut claimable_payments = channel_manager.claimable_payments.lock().unwrap();
13229+ let payment = claimable_payments.claimable_payments.remove(&payment_hash);
13230+ mem::drop(claimable_payments);
13231+ if let Some(payment) = payment {
13232+ log_info!(channel_manager.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
13233+ let mut claimable_amt_msat = 0;
13234+ let mut receiver_node_id = Some(our_network_pubkey);
13235+ let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
13236+ if phantom_shared_secret.is_some() {
13237+ let phantom_pubkey = channel_manager.node_signer.get_node_id(Recipient::PhantomNode)
13238+ .expect("Failed to get node_id for phantom node recipient");
13239+ receiver_node_id = Some(phantom_pubkey)
13240+ }
13241+ for claimable_htlc in &payment.htlcs {
13242+ claimable_amt_msat += claimable_htlc.value;
13243+
13244+ // Add a holding-cell claim of the payment to the Channel, which should be
13245+ // applied ~immediately on peer reconnection. Because it won't generate a
13246+ // new commitment transaction we can just provide the payment preimage to
13247+ // the corresponding ChannelMonitor and nothing else.
13248+ //
13249+ // We do so directly instead of via the normal ChannelMonitor update
13250+ // procedure as the ChainMonitor hasn't yet been initialized, implying
13251+ // we're not allowed to call it directly yet. Further, we do the update
13252+ // without incrementing the ChannelMonitor update ID as there isn't any
13253+ // reason to.
13254+ // If we were to generate a new ChannelMonitor update ID here and then
13255+ // crash before the user finishes block connect we'd end up force-closing
13256+ // this channel as well. On the flip side, there's no harm in restarting
13257+ // without the new monitor persisted - we'll end up right back here on
13258+ // restart.
13259+ let previous_channel_id = claimable_htlc.prev_hop.channel_id;
13260+ let peer_node_id_opt = channel_manager.outpoint_to_peer.lock().unwrap()
13261+ .get(&claimable_htlc.prev_hop.outpoint).cloned();
13262+ if let Some(peer_node_id) = peer_node_id_opt {
13263+ let peer_state_mutex = per_peer_state.get(&peer_node_id).unwrap();
13264+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
13265+ let peer_state = &mut *peer_state_lock;
13266+ if let Some(ChannelPhase::Funded(channel)) = peer_state.channel_by_id.get_mut(&previous_channel_id) {
13267+ let logger = WithChannelContext::from(&channel_manager.logger, &channel.context, Some(payment_hash));
13268+ channel.claim_htlc_while_disconnected_dropping_mon_update(claimable_htlc.prev_hop.htlc_id, payment_preimage, &&logger);
13269+ }
13270+ }
13271+ if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
13272+ previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &channel_manager.tx_broadcaster, &channel_manager.fee_estimator, &channel_manager.logger);
13273+ }
13274+ }
13275+ let mut pending_events = channel_manager.pending_events.lock().unwrap();
13276+ let payment_id = payment.inbound_payment_id(&inbound_payment_id_secret.unwrap());
13277+ pending_events.push_back((events::Event::PaymentClaimed {
13278+ receiver_node_id,
13279+ payment_hash,
13280+ purpose: payment.purpose,
13281+ amount_msat: claimable_amt_msat,
13282+ htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
13283+ sender_intended_total_msat: payment.htlcs.first().map(|htlc| htlc.total_msat),
13284+ onion_fields: payment.onion_fields,
13285+ payment_id: Some(payment_id),
13286+ }, None));
13287+ }
1318513288 }
1318613289 }
1318713290 }
0 commit comments