Skip to content

Commit 1237998

Browse files
committed
Track pending update_add_htlcs in ChannelManager for later processing
We plan to decode the onions of these `update_add_htlc`s as part of the HTLC forwarding flow (i.e., `process_pending_htlc_forwards`), so we'll need to track them per-channel at the `ChannelManager` level.
1 parent 7639dab commit 1237998

File tree

2 files changed

+68
-16
lines changed

2 files changed

+68
-16
lines changed

lightning/src/ln/channel.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,7 @@ pub(super) struct MonitorRestoreUpdates {
10691069
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
10701070
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
10711071
pub finalized_claimed_htlcs: Vec<HTLCSource>,
1072+
pub pending_update_adds: Vec<msgs::UpdateAddHTLC>,
10721073
pub funding_broadcastable: Option<Transaction>,
10731074
pub channel_ready: Option<msgs::ChannelReady>,
10741075
pub announcement_sigs: Option<msgs::AnnouncementSignatures>,
@@ -5248,13 +5249,16 @@ impl<SP: Deref> Channel<SP> where
52485249
mem::swap(&mut failed_htlcs, &mut self.context.monitor_pending_failures);
52495250
let mut finalized_claimed_htlcs = Vec::new();
52505251
mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills);
5252+
let mut pending_update_adds = Vec::new();
5253+
mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds);
52515254

52525255
if self.context.channel_state.is_peer_disconnected() {
52535256
self.context.monitor_pending_revoke_and_ack = false;
52545257
self.context.monitor_pending_commitment_signed = false;
52555258
return MonitorRestoreUpdates {
52565259
raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst,
5257-
accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs
5260+
accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds,
5261+
funding_broadcastable, channel_ready, announcement_sigs
52585262
};
52595263
}
52605264

@@ -5276,7 +5280,8 @@ impl<SP: Deref> Channel<SP> where
52765280
if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" },
52775281
match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
52785282
MonitorRestoreUpdates {
5279-
raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs
5283+
raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs,
5284+
pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs
52805285
}
52815286
}
52825287

lightning/src/ln/channelmanager.rs

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,8 @@ where
11811181
// | |
11821182
// | |__`pending_intercepted_htlcs`
11831183
// |
1184+
// |__`decode_update_add_htlcs`
1185+
// |
11841186
// |__`per_peer_state`
11851187
// |
11861188
// |__`pending_inbound_payments`
@@ -1271,6 +1273,18 @@ where
12711273
/// See `ChannelManager` struct-level documentation for lock order requirements.
12721274
pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,
12731275

1276+
/// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
1277+
///
1278+
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
1279+
/// though in practice we probably won't be receiving HTLCs for a channel both via the alias
1280+
/// and via the classic SCID.
1281+
///
1282+
/// Note that no consistency guarantees are made about the existence of a channel with the
1283+
/// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`!
1284+
///
1285+
/// See `ChannelManager` struct-level documentation for lock order requirements.
1286+
decode_update_add_htlcs: Mutex<HashMap<u64, Vec<msgs::UpdateAddHTLC>>>,
1287+
12741288
/// The sets of payments which are claimable or currently being claimed. See
12751289
/// [`ClaimablePayments`]' individual field docs for more info.
12761290
///
@@ -2238,9 +2252,9 @@ macro_rules! handle_monitor_update_completion {
22382252
let update_actions = $peer_state.monitor_update_blocked_actions
22392253
.remove(&$chan.context.channel_id()).unwrap_or(Vec::new());
22402254

2241-
let htlc_forwards = $self.handle_channel_resumption(
2255+
let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption(
22422256
&mut $peer_state.pending_msg_events, $chan, updates.raa,
2243-
updates.commitment_update, updates.order, updates.accepted_htlcs,
2257+
updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds,
22442258
updates.funding_broadcastable, updates.channel_ready,
22452259
updates.announcement_sigs);
22462260
if let Some(upd) = channel_update {
@@ -2301,6 +2315,9 @@ macro_rules! handle_monitor_update_completion {
23012315
if let Some(forwards) = htlc_forwards {
23022316
$self.forward_htlcs(&mut [forwards][..]);
23032317
}
2318+
if let Some(decode) = decode_update_add_htlcs {
2319+
$self.decode_update_add_htlcs(decode);
2320+
}
23042321
$self.finalize_claims(updates.finalized_claimed_htlcs);
23052322
for failure in updates.failed_htlcs.drain(..) {
23062323
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
@@ -2477,6 +2494,7 @@ where
24772494
pending_inbound_payments: Mutex::new(new_hash_map()),
24782495
pending_outbound_payments: OutboundPayments::new(),
24792496
forward_htlcs: Mutex::new(new_hash_map()),
2497+
decode_update_add_htlcs: Mutex::new(new_hash_map()),
24802498
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
24812499
pending_intercepted_htlcs: Mutex::new(new_hash_map()),
24822500
outpoint_to_peer: Mutex::new(new_hash_map()),
@@ -5929,24 +5947,31 @@ where
59295947
fn handle_channel_resumption(&self, pending_msg_events: &mut Vec<MessageSendEvent>,
59305948
channel: &mut Channel<SP>, raa: Option<msgs::RevokeAndACK>,
59315949
commitment_update: Option<msgs::CommitmentUpdate>, order: RAACommitmentOrder,
5932-
pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
5950+
pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
5951+
funding_broadcastable: Option<Transaction>,
59335952
channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
5934-
-> Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> {
5953+
-> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
59355954
let logger = WithChannelContext::from(&self.logger, &channel.context);
5936-
log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
5955+
log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
59375956
&channel.context.channel_id(),
59385957
if raa.is_some() { "an" } else { "no" },
5939-
if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
5958+
if commitment_update.is_some() { "a" } else { "no" },
5959+
pending_forwards.len(), pending_update_adds.len(),
59405960
if funding_broadcastable.is_some() { "" } else { "not " },
59415961
if channel_ready.is_some() { "sending" } else { "without" },
59425962
if announcement_sigs.is_some() { "sending" } else { "without" });
59435963

5944-
let mut htlc_forwards = None;
5945-
59465964
let counterparty_node_id = channel.context.get_counterparty_node_id();
5965+
let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias());
5966+
5967+
let mut htlc_forwards = None;
59475968
if !pending_forwards.is_empty() {
5948-
htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()),
5949-
channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
5969+
htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
5970+
channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
5971+
}
5972+
let mut decode_update_add_htlcs = None;
5973+
if !pending_update_adds.is_empty() {
5974+
decode_update_add_htlcs = Some((short_channel_id, pending_update_adds));
59505975
}
59515976

59525977
if let Some(msg) = channel_ready {
@@ -5997,7 +6022,7 @@ where
59976022
emit_channel_ready_event!(pending_events, channel);
59986023
}
59996024

6000-
htlc_forwards
6025+
(htlc_forwards, decode_update_add_htlcs)
60016026
}
60026027

60036028
fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
@@ -6971,6 +6996,17 @@ where
69716996
}
69726997
}
69736998

6999+
fn decode_update_add_htlcs(&self, update_add_htlcs: (u64, Vec<msgs::UpdateAddHTLC>)) {
7000+
let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
7001+
let scid = update_add_htlcs.0;
7002+
for update_add_htlc in update_add_htlcs.1 {
7003+
match decode_update_add_htlcs.entry(scid) {
7004+
hash_map::Entry::Occupied(mut e) => { e.get_mut().push(update_add_htlc); },
7005+
hash_map::Entry::Vacant(e) => { e.insert(vec![update_add_htlc]); },
7006+
}
7007+
}
7008+
}
7009+
69747010
#[inline]
69757011
fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
69767012
for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
@@ -7307,10 +7343,10 @@ where
73077343
}
73087344
}
73097345
let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take();
7310-
let htlc_forwards = self.handle_channel_resumption(
7346+
let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption(
73117347
&mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order,
7312-
Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
7313-
debug_assert!(htlc_forwards.is_none());
7348+
Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
7349+
debug_assert!(htlc_forwards.is_none() && decode_update_add_htlcs.is_none());
73147350
if let Some(upd) = channel_update {
73157351
peer_state.pending_msg_events.push(upd);
73167352
}
@@ -10192,6 +10228,12 @@ where
1019210228
}
1019310229
}
1019410230

10231+
let mut decode_update_add_htlcs_opt = None;
10232+
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
10233+
if !decode_update_add_htlcs.is_empty() {
10234+
decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
10235+
}
10236+
1019510237
let per_peer_state = self.per_peer_state.write().unwrap();
1019610238

1019710239
let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
@@ -10343,6 +10385,7 @@ where
1034310385
(10, in_flight_monitor_updates, option),
1034410386
(11, self.probing_cookie_secret, required),
1034510387
(13, htlc_onion_fields, optional_vec),
10388+
(14, decode_update_add_htlcs_opt, option),
1034610389
});
1034710390

1034810391
Ok(())
@@ -10808,6 +10851,7 @@ where
1080810851
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
1080910852
let mut events_override = None;
1081010853
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
10854+
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1081110855
read_tlv_fields!(reader, {
1081210856
(1, pending_outbound_payments_no_retry, option),
1081310857
(2, pending_intercepted_htlcs, option),
@@ -10821,7 +10865,9 @@ where
1082110865
(10, in_flight_monitor_updates, option),
1082210866
(11, probing_cookie_secret, option),
1082310867
(13, claimable_htlc_onion_fields, optional_vec),
10868+
(14, decode_update_add_htlcs, option),
1082410869
});
10870+
let decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
1082510871
if fake_scid_rand_bytes.is_none() {
1082610872
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
1082710873
}
@@ -11356,6 +11402,7 @@ where
1135611402
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
1135711403

1135811404
forward_htlcs: Mutex::new(forward_htlcs),
11405+
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
1135911406
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }),
1136011407
outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
1136111408
outpoint_to_peer: Mutex::new(outpoint_to_peer),

0 commit comments

Comments
 (0)