Skip to content

Split out receive_htlcs from the forwarding pipeline #3973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 85 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2462,6 +2462,8 @@ where
// | |
// | |__`pending_intercepted_htlcs`
// |
// |__`receive_htlcs`
// |
// |__`decode_update_add_htlcs`
// |
// |__`per_peer_state`
Expand Down Expand Up @@ -2537,7 +2539,7 @@ pub struct ChannelManager<
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_outbound_payments: OutboundPayments,

/// SCID/SCID Alias -> forward infos. Key of 0 means payments received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could document that 0 actually means trampoline now.

/// SCID/SCID Alias -> forward infos.
///
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
/// though in practice we probably won't be receiving HTLCs for a channel both via the alias
Expand All @@ -2546,6 +2548,9 @@ pub struct ChannelManager<
/// Note that no consistency guarantees are made about the existence of a channel with the
/// `short_channel_id` here, nor the `short_channel_id` in the `PendingHTLCInfo`!
///
/// This will also hold any [`FailHTLC`]s arising from handling [`Self::pending_intercepted_htlcs`] or
/// [`Self::receive_htlcs`].
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
#[cfg(test)]
pub(super) forward_htlcs: Mutex<HashMap<u64, Vec<HTLCForwardInfo>>>,
Expand All @@ -2554,9 +2559,21 @@ pub struct ChannelManager<
/// Storage for HTLCs that have been intercepted and bubbled up to the user. We hold them here
/// until the user tells us what we should do with them.
///
/// Note that any failures that may arise from handling these will be pushed to
/// [`Self::forward_htlcs`] with the previous hop's SCID.
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,

/// Storage for HTLCs that are meant for us.
///
/// Note that any failures that may arise from handling these will be pushed to
/// [`Self::forward_htlcs`] with the previous hop's SCID.
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
#[cfg(test)]
pub(super) receive_htlcs: Mutex<Vec<HTLCForwardInfo>>,
#[cfg(not(test))]
receive_htlcs: Mutex<Vec<HTLCForwardInfo>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ISTM we should change the type here because we still have the issues from the combined map (panics in process_forward_htlcs for receives and panics in process_receive_htlcs for forwards) but now dont have the reason for the (one map). Also the panic messages in those methods are wrong now.

/// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
///
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
Expand Down Expand Up @@ -3738,6 +3755,7 @@ where
outbound_scid_aliases: Mutex::new(new_hash_set()),
pending_outbound_payments: OutboundPayments::new(new_hash_map()),
forward_htlcs: Mutex::new(new_hash_map()),
receive_htlcs: Mutex::new(Vec::new()),
decode_update_add_htlcs: Mutex::new(new_hash_map()),
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
pending_intercepted_htlcs: Mutex::new(new_hash_map()),
Expand Down Expand Up @@ -6355,6 +6373,9 @@ where
if !self.forward_htlcs.lock().unwrap().is_empty() {
return true;
}
if !self.receive_htlcs.lock().unwrap().is_empty() {
return true;
}
if !self.decode_update_add_htlcs.lock().unwrap().is_empty() {
return true;
}
Expand Down Expand Up @@ -6402,20 +6423,19 @@ where

for (short_chan_id, mut pending_forwards) in forward_htlcs {
should_persist = NotifyOption::DoPersist;
if short_chan_id != 0 {
self.process_forward_htlcs(
short_chan_id,
&mut pending_forwards,
&mut failed_forwards,
&mut phantom_receives,
);
} else {
self.process_receive_htlcs(
&mut pending_forwards,
&mut new_events,
&mut failed_forwards,
);
}
self.process_forward_htlcs(
short_chan_id,
&mut pending_forwards,
&mut failed_forwards,
&mut phantom_receives,
);
}

let mut receive_htlcs = Vec::new();
mem::swap(&mut receive_htlcs, &mut self.receive_htlcs.lock().unwrap());
if !receive_htlcs.is_empty() {
self.process_receive_htlcs(receive_htlcs, &mut new_events, &mut failed_forwards);
should_persist = NotifyOption::DoPersist;
}

let best_block_height = self.best_block.read().unwrap().height;
Expand Down Expand Up @@ -6929,11 +6949,11 @@ where
}

fn process_receive_htlcs(
&self, pending_forwards: &mut Vec<HTLCForwardInfo>,
&self, receive_htlcs: Vec<HTLCForwardInfo>,
new_events: &mut VecDeque<(Event, Option<EventCompletionAction>)>,
failed_forwards: &mut Vec<FailedHTLCForward>,
) {
'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
'next_forwardable_htlc: for forward_info in receive_htlcs.into_iter() {
match forward_info {
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id,
Expand Down Expand Up @@ -10346,8 +10366,21 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
let scid = match forward_info.routing {
PendingHTLCRouting::Forward { short_channel_id, .. } => short_channel_id,
PendingHTLCRouting::TrampolineForward { .. } => 0,
PendingHTLCRouting::Receive { .. } => 0,
PendingHTLCRouting::ReceiveKeysend { .. } => 0,
PendingHTLCRouting::Receive { .. }
| PendingHTLCRouting::ReceiveKeysend { .. } => {
self.receive_htlcs.lock().unwrap().push(HTLCForwardInfo::AddHTLC(
PendingAddHTLCInfo {
prev_short_channel_id,
prev_counterparty_node_id,
prev_funding_outpoint,
prev_channel_id,
prev_htlc_id,
prev_user_channel_id,
forward_info,
},
));
continue;
},
};
// Pull this now to avoid introducing a lock order with `forward_htlcs`.
let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
Expand Down Expand Up @@ -15091,6 +15124,8 @@ where
}
}

let receive_htlcs = self.receive_htlcs.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it break downgrades to only write the new vec, and not also write the receives in the legacy forward_htlcs?


let mut decode_update_add_htlcs_opt = None;
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
if !decode_update_add_htlcs.is_empty() {
Expand Down Expand Up @@ -15258,6 +15293,7 @@ where
(17, in_flight_monitor_updates, option),
(19, peer_storage_dir, optional_vec),
(21, self.flow.writeable_async_receive_offer_cache(), required),
(23, *receive_htlcs, required_vec),
});

Ok(())
Expand Down Expand Up @@ -15818,6 +15854,7 @@ where
const MAX_ALLOC_SIZE: usize = 1024 * 64;
let forward_htlcs_count: u64 = Readable::read(reader)?;
let mut forward_htlcs = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128));
let mut legacy_receive_htlcs: Vec<HTLCForwardInfo> = Vec::new();
for _ in 0..forward_htlcs_count {
let short_channel_id = Readable::read(reader)?;
let pending_forwards_count: u64 = Readable::read(reader)?;
Expand All @@ -15826,7 +15863,26 @@ where
MAX_ALLOC_SIZE / mem::size_of::<HTLCForwardInfo>(),
));
for _ in 0..pending_forwards_count {
pending_forwards.push(Readable::read(reader)?);
let pending_htlc = Readable::read(reader)?;
// Prior to LDK 0.2, Receive HTLCs used to be stored in `forward_htlcs` under SCID == 0. Here we migrate
// the old data if necessary.
if short_channel_id == 0 {
match pending_htlc {
HTLCForwardInfo::AddHTLC(ref htlc_info) => {
if matches!(
htlc_info.forward_info.routing,
PendingHTLCRouting::Receive { .. }
| PendingHTLCRouting::ReceiveKeysend { .. }
) {
legacy_receive_htlcs.push(pending_htlc);
continue;
}
},
_ => {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could debug_assert!(false) here if only to indicate we should never hit this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I intentionally let all other cases fall through to the the previous behavior. In particular, it seems that trampoline forwards would also be added under SCID 0 but we wouldn't want to push them to receive_htlcs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't Trampoline forwards also be an AddHTLC? So the bottom _ match arm would still be unreachable. It also seems like we should fail here since any failures against SCID 0 should fail anyway (we won't be able to fail cause no such channel exists?)

}
}

pending_forwards.push(pending_htlc);
}
forward_htlcs.insert(short_channel_id, pending_forwards);
}
Expand Down Expand Up @@ -15943,6 +15999,7 @@ where
let mut inbound_payment_id_secret = None;
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new();
let mut receive_htlcs = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
(2, pending_intercepted_htlcs, option),
Expand All @@ -15961,8 +16018,14 @@ where
(17, in_flight_monitor_updates, option),
(19, peer_storage_dir, optional_vec),
(21, async_receive_offer_cache, (default_value, async_receive_offer_cache)),
(23, receive_htlcs, optional_vec),
});
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
debug_assert!(
receive_htlcs.as_ref().map_or(true, |r| r.is_empty())
|| legacy_receive_htlcs.is_empty()
);
let receive_htlcs = receive_htlcs.unwrap_or_else(|| legacy_receive_htlcs);
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
if fake_scid_rand_bytes.is_none() {
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
Expand Down Expand Up @@ -16791,6 +16854,7 @@ where
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),

forward_htlcs: Mutex::new(forward_htlcs),
receive_htlcs: Mutex::new(receive_htlcs),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier in this method, we remove pending payments that are not present in the monitors, and ISTM we should be doing this for receive_htlcs now as well

decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
claimable_payments: Mutex::new(ClaimablePayments {
claimable_payments,
Expand Down
99 changes: 58 additions & 41 deletions lightning/src/ln/onion_route_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,17 +1173,21 @@ fn test_onion_failure() {
|_| {},
|| {
nodes[1].node.process_pending_update_add_htlcs();
for (_, pending_forwards) in nodes[1].node.forward_htlcs.lock().unwrap().iter_mut() {
for f in pending_forwards.iter_mut() {
match f {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
ref mut forward_info,
..
}) => forward_info.outgoing_cltv_value -= 1,
_ => {},
}
assert_eq!(nodes[1].node.forward_htlcs.lock().unwrap().len(), 1);
if let Some((_, pending_forwards)) =
nodes[1].node.forward_htlcs.lock().unwrap().iter_mut().next()
{
assert_eq!(pending_forwards.len(), 1);
match pending_forwards.get_mut(0).unwrap() {
Comment on lines +1176 to +1181
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This repeated pattern really looks like it could be in a test util function or macro.

&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
ref mut forward_info,
..
}) => forward_info.outgoing_cltv_value -= 1,
_ => panic!("Unexpected HTLCForwardInfo"),
}
}
} else {
panic!("Expected pending forwards!");
};
},
true,
Some(LocalHTLCFailureReason::FinalIncorrectCLTVExpiry),
Expand All @@ -1203,17 +1207,21 @@ fn test_onion_failure() {
|| {
nodes[1].node.process_pending_update_add_htlcs();
// violate amt_to_forward > msg.amount_msat
for (_, pending_forwards) in nodes[1].node.forward_htlcs.lock().unwrap().iter_mut() {
for f in pending_forwards.iter_mut() {
match f {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
ref mut forward_info,
..
}) => forward_info.outgoing_amt_msat -= 1,
_ => {},
}
assert_eq!(nodes[1].node.forward_htlcs.lock().unwrap().len(), 1);
if let Some((_, pending_forwards)) =
nodes[1].node.forward_htlcs.lock().unwrap().iter_mut().next()
{
assert_eq!(pending_forwards.len(), 1);
match pending_forwards.get_mut(0).unwrap() {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
ref mut forward_info,
..
}) => forward_info.outgoing_amt_msat -= 1,
_ => panic!("Unexpected HTLCForwardInfo"),
}
}
} else {
panic!("Expected pending forwards!");
};
},
true,
Some(LocalHTLCFailureReason::FinalIncorrectHTLCAmount),
Expand Down Expand Up @@ -1553,16 +1561,21 @@ fn test_overshoot_final_cltv() {
commitment_signed_dance!(nodes[1], nodes[0], &update_0.commitment_signed, false, true);

assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
for (_, pending_forwards) in nodes[1].node.forward_htlcs.lock().unwrap().iter_mut() {
for f in pending_forwards.iter_mut() {
match f {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
ref mut forward_info, ..
}) => forward_info.outgoing_cltv_value += 1,
_ => {},
}
nodes[1].node.process_pending_update_add_htlcs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line gets removed in a later commit? It doesn't seem to belong in this commit.

assert_eq!(nodes[1].node.forward_htlcs.lock().unwrap().len(), 1);
if let Some((_, pending_forwards)) =
nodes[1].node.forward_htlcs.lock().unwrap().iter_mut().next()
{
assert_eq!(pending_forwards.len(), 1);
match pending_forwards.get_mut(0).unwrap() {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { ref mut forward_info, .. }) => {
forward_info.outgoing_cltv_value += 1
},
_ => panic!("Unexpected HTLCForwardInfo"),
}
}
} else {
panic!("Expected pending forwards!");
};
expect_and_process_pending_htlcs(&nodes[1], false);

check_added_monitors!(&nodes[1], 1);
Expand Down Expand Up @@ -2614,19 +2627,23 @@ fn test_phantom_final_incorrect_cltv_expiry() {
nodes[1].node.process_pending_update_add_htlcs();

// Modify the payload so the phantom hop's HMAC is bogus.
for (_, pending_forwards) in nodes[1].node.forward_htlcs.lock().unwrap().iter_mut() {
for f in pending_forwards.iter_mut() {
match f {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
forward_info: PendingHTLCInfo { ref mut outgoing_cltv_value, .. },
..
}) => {
*outgoing_cltv_value -= 1;
},
_ => panic!("Unexpected forward"),
}
assert_eq!(nodes[1].node.forward_htlcs.lock().unwrap().len(), 1);
if let Some((_, pending_forwards)) =
nodes[1].node.forward_htlcs.lock().unwrap().iter_mut().next()
{
assert_eq!(pending_forwards.len(), 1);
match pending_forwards.get_mut(0).unwrap() {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
forward_info: PendingHTLCInfo { ref mut outgoing_cltv_value, .. },
..
}) => {
*outgoing_cltv_value -= 1;
},
_ => panic!("Unexpected HTLCForwardInfo"),
}
}
} else {
panic!("Expected pending forwards!");
};
nodes[1].node.process_pending_htlc_forwards();
expect_htlc_failure_conditions(
nodes[1].node.get_and_clear_pending_events(),
Expand Down
Loading
Loading