Skip to content

Split out receive_htlcs from the forwarding pipeline #3973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 85 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2462,6 +2462,8 @@ where
// | |
// | |__`pending_intercepted_htlcs`
// |
// |__`receive_htlcs`
// |
// |__`decode_update_add_htlcs`
// |
// |__`per_peer_state`
Expand Down Expand Up @@ -2537,7 +2539,7 @@ pub struct ChannelManager<
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_outbound_payments: OutboundPayments,

/// SCID/SCID Alias -> forward infos. Key of 0 means payments received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could document that 0 actually means trampoline now.

/// SCID/SCID Alias -> forward infos.
///
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
/// though in practice we probably won't be receiving HTLCs for a channel both via the alias
Expand All @@ -2546,6 +2548,9 @@ pub struct ChannelManager<
/// Note that no consistency guarantees are made about the existence of a channel with the
/// `short_channel_id` here, nor the `short_channel_id` in the `PendingHTLCInfo`!
///
/// This will also hold any [`FailHTLC`]s arising from handling [`Self::pending_intercepted_htlcs`] or
/// [`Self::receive_htlcs`].
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
#[cfg(test)]
pub(super) forward_htlcs: Mutex<HashMap<u64, Vec<HTLCForwardInfo>>>,
Expand All @@ -2554,9 +2559,21 @@ pub struct ChannelManager<
/// Storage for HTLCs that have been intercepted and bubbled up to the user. We hold them here
/// until the user tells us what we should do with them.
///
/// Note that any failures that may arise from handling these will be pushed to
/// [`Self::forward_htlcs`] with the previous hop's SCID.
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,

/// Storage for HTLCs that are meant for us.
///
/// Note that any failures that may arise from handling these will be pushed to
/// [`Self::forward_htlcs`] with the previous hop's SCID.
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
#[cfg(test)]
pub(super) receive_htlcs: Mutex<Vec<HTLCForwardInfo>>,
#[cfg(not(test))]
receive_htlcs: Mutex<Vec<HTLCForwardInfo>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ISTM we should change the type here because we still have the issues from the combined map (panics in process_forward_htlcs for receives and panics in process_receive_htlcs for forwards) but now dont have the reason for the (one map). Also the panic messages in those methods are wrong now.

/// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
///
/// Note that because we may have an SCID Alias as the key we can have two entries per channel,
Expand Down Expand Up @@ -3738,6 +3755,7 @@ where
outbound_scid_aliases: Mutex::new(new_hash_set()),
pending_outbound_payments: OutboundPayments::new(new_hash_map()),
forward_htlcs: Mutex::new(new_hash_map()),
receive_htlcs: Mutex::new(Vec::new()),
decode_update_add_htlcs: Mutex::new(new_hash_map()),
claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
pending_intercepted_htlcs: Mutex::new(new_hash_map()),
Expand Down Expand Up @@ -6355,6 +6373,9 @@ where
if !self.forward_htlcs.lock().unwrap().is_empty() {
return true;
}
if !self.receive_htlcs.lock().unwrap().is_empty() {
return true;
}
if !self.decode_update_add_htlcs.lock().unwrap().is_empty() {
return true;
}
Expand Down Expand Up @@ -6402,20 +6423,19 @@ where

for (short_chan_id, mut pending_forwards) in forward_htlcs {
should_persist = NotifyOption::DoPersist;
if short_chan_id != 0 {
self.process_forward_htlcs(
short_chan_id,
&mut pending_forwards,
&mut failed_forwards,
&mut phantom_receives,
);
} else {
self.process_receive_htlcs(
&mut pending_forwards,
&mut new_events,
&mut failed_forwards,
);
}
self.process_forward_htlcs(
short_chan_id,
&mut pending_forwards,
&mut failed_forwards,
&mut phantom_receives,
);
}

let mut receive_htlcs = Vec::new();
mem::swap(&mut receive_htlcs, &mut self.receive_htlcs.lock().unwrap());
if !receive_htlcs.is_empty() {
self.process_receive_htlcs(receive_htlcs, &mut new_events, &mut failed_forwards);
should_persist = NotifyOption::DoPersist;
}

let best_block_height = self.best_block.read().unwrap().height;
Expand Down Expand Up @@ -6929,11 +6949,11 @@ where
}

fn process_receive_htlcs(
&self, pending_forwards: &mut Vec<HTLCForwardInfo>,
&self, receive_htlcs: Vec<HTLCForwardInfo>,
new_events: &mut VecDeque<(Event, Option<EventCompletionAction>)>,
failed_forwards: &mut Vec<FailedHTLCForward>,
) {
'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
'next_forwardable_htlc: for forward_info in receive_htlcs.into_iter() {
match forward_info {
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id,
Expand Down Expand Up @@ -10346,8 +10366,21 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
let scid = match forward_info.routing {
PendingHTLCRouting::Forward { short_channel_id, .. } => short_channel_id,
PendingHTLCRouting::TrampolineForward { .. } => 0,
PendingHTLCRouting::Receive { .. } => 0,
PendingHTLCRouting::ReceiveKeysend { .. } => 0,
PendingHTLCRouting::Receive { .. }
| PendingHTLCRouting::ReceiveKeysend { .. } => {
self.receive_htlcs.lock().unwrap().push(HTLCForwardInfo::AddHTLC(
PendingAddHTLCInfo {
prev_short_channel_id,
prev_counterparty_node_id,
prev_funding_outpoint,
prev_channel_id,
prev_htlc_id,
prev_user_channel_id,
forward_info,
},
));
continue;
},
};
// Pull this now to avoid introducing a lock order with `forward_htlcs`.
let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
Expand Down Expand Up @@ -15091,6 +15124,8 @@ where
}
}

let receive_htlcs = self.receive_htlcs.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it break downgrades to only write the new vec, and not also write the receives in the legacy forward_htlcs?


let mut decode_update_add_htlcs_opt = None;
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
if !decode_update_add_htlcs.is_empty() {
Expand Down Expand Up @@ -15258,6 +15293,7 @@ where
(17, in_flight_monitor_updates, option),
(19, peer_storage_dir, optional_vec),
(21, self.flow.writeable_async_receive_offer_cache(), required),
(23, *receive_htlcs, required_vec),
});

Ok(())
Expand Down Expand Up @@ -15818,6 +15854,7 @@ where
const MAX_ALLOC_SIZE: usize = 1024 * 64;
let forward_htlcs_count: u64 = Readable::read(reader)?;
let mut forward_htlcs = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128));
let mut legacy_receive_htlcs: Vec<HTLCForwardInfo> = Vec::new();
for _ in 0..forward_htlcs_count {
let short_channel_id = Readable::read(reader)?;
let pending_forwards_count: u64 = Readable::read(reader)?;
Expand All @@ -15826,7 +15863,26 @@ where
MAX_ALLOC_SIZE / mem::size_of::<HTLCForwardInfo>(),
));
for _ in 0..pending_forwards_count {
pending_forwards.push(Readable::read(reader)?);
let pending_htlc = Readable::read(reader)?;
// Prior to LDK 0.2, Receive HTLCs used to be stored in `forward_htlcs` under SCID == 0. Here we migrate
// the old data if necessary.
if short_channel_id == 0 {
match pending_htlc {
HTLCForwardInfo::AddHTLC(ref htlc_info) => {
if matches!(
htlc_info.forward_info.routing,
PendingHTLCRouting::Receive { .. }
| PendingHTLCRouting::ReceiveKeysend { .. }
) {
legacy_receive_htlcs.push(pending_htlc);
continue;
}
},
_ => {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could debug_assert!(false) here if only to indicate we should never hit this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I intentionally let all other cases fall through to the the previous behavior. In particular, it seems that trampoline forwards would also be added under SCID 0 but we wouldn't want to push them to receive_htlcs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't Trampoline forwards also be an AddHTLC? So the bottom _ match arm would still be unreachable. It also seems like we should fail here since any failures against SCID 0 should fail anyway (we won't be able to fail cause no such channel exists?)

}
}

pending_forwards.push(pending_htlc);
}
forward_htlcs.insert(short_channel_id, pending_forwards);
}
Expand Down Expand Up @@ -15943,6 +15999,7 @@ where
let mut inbound_payment_id_secret = None;
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new();
let mut receive_htlcs = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
(2, pending_intercepted_htlcs, option),
Expand All @@ -15961,8 +16018,14 @@ where
(17, in_flight_monitor_updates, option),
(19, peer_storage_dir, optional_vec),
(21, async_receive_offer_cache, (default_value, async_receive_offer_cache)),
(23, receive_htlcs, optional_vec),
});
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
debug_assert!(
receive_htlcs.as_ref().map_or(true, |r| r.is_empty())
|| legacy_receive_htlcs.is_empty()
);
let receive_htlcs = receive_htlcs.unwrap_or_else(|| legacy_receive_htlcs);
let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
if fake_scid_rand_bytes.is_none() {
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
Expand Down Expand Up @@ -16791,6 +16854,7 @@ where
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),

forward_htlcs: Mutex::new(forward_htlcs),
receive_htlcs: Mutex::new(receive_htlcs),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier in this method, we remove pending payments that are not present in the monitors, and ISTM we should be doing this for receive_htlcs now as well

decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
claimable_payments: Mutex::new(ClaimablePayments {
claimable_payments,
Expand Down
87 changes: 36 additions & 51 deletions lightning/src/ln/payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,30 +636,23 @@ fn test_reject_mpp_keysend_htlc_mismatching_secret() {
nodes[3].node.process_pending_update_add_htlcs();

assert!(nodes[3].node.get_and_clear_pending_msg_events().is_empty());
assert_eq!(nodes[3].node.forward_htlcs.lock().unwrap().len(), 1);
if let Some((_, pending_forwards)) =
nodes[3].node.forward_htlcs.lock().unwrap().iter_mut().next()
{
assert_eq!(pending_forwards.len(), 1);
match pending_forwards.get_mut(0).unwrap() {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { ref mut forward_info, .. }) => {
match forward_info.routing {
PendingHTLCRouting::ReceiveKeysend { ref mut payment_data, .. } => {
*payment_data = Some(msgs::FinalOnionHopData {
payment_secret: PaymentSecret([42; 32]),
total_msat: amount * 2,
});
},
_ => panic!("Expected PendingHTLCRouting::ReceiveKeysend"),
}
},
_ => {
panic!("Unexpected HTLCForwardInfo");
},
}
} else {
panic!("Expected pending receive");
};
assert_eq!(nodes[3].node.receive_htlcs.lock().unwrap().len(), 1);
match nodes[3].node.receive_htlcs.lock().unwrap().get_mut(0).unwrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could've avoided a bit of rewriting here and below by doing match .. get_mut(0).unwrap() in the first commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yeah. I don't fully recall why I did it this way. Is it fine for you to leave it as-is now?

&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { ref mut forward_info, .. }) => {
match forward_info.routing {
PendingHTLCRouting::ReceiveKeysend { ref mut payment_data, .. } => {
*payment_data = Some(msgs::FinalOnionHopData {
payment_secret: PaymentSecret([42; 32]),
total_msat: amount * 2,
});
},
_ => panic!("Expected PendingHTLCRouting::ReceiveKeysend"),
}
},
_ => {
panic!("Unexpected HTLCForwardInfo");
},
}
nodes[3].node.process_pending_htlc_forwards();

// Pay along nodes[2]
Expand Down Expand Up @@ -687,34 +680,26 @@ fn test_reject_mpp_keysend_htlc_mismatching_secret() {
let update_add_3 = update_3.update_add_htlcs[0].clone();
nodes[3].node.handle_update_add_htlc(node_c_id, &update_add_3);
commitment_signed_dance!(nodes[3], nodes[2], update_3.commitment_signed, false, true);
expect_htlc_failure_conditions(nodes[3].node.get_and_clear_pending_events(), &[]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on the reason for removing this, mind explaining? I thought there shouldn't be a behavior change here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! The event queue is actually empty here, so this is a no-op, which we simply made a bit more explicit with the assert below.

Happy to drop the diff, but FWIW it might be worth cleaning up all instances of expect_htlc_failure_conditions(.., &[]); ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine, in the future I think it would be a bit more reviewable to either separate out or note in the commit message.

nodes[3].node.process_pending_update_add_htlcs();

assert!(nodes[3].node.get_and_clear_pending_events().is_empty());
assert!(nodes[3].node.get_and_clear_pending_msg_events().is_empty());
assert_eq!(nodes[3].node.forward_htlcs.lock().unwrap().len(), 1);
if let Some((_, pending_forwards)) =
nodes[3].node.forward_htlcs.lock().unwrap().iter_mut().next()
{
assert_eq!(pending_forwards.len(), 1);
match pending_forwards.get_mut(0).unwrap() {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { ref mut forward_info, .. }) => {
match forward_info.routing {
PendingHTLCRouting::ReceiveKeysend { ref mut payment_data, .. } => {
*payment_data = Some(msgs::FinalOnionHopData {
payment_secret: PaymentSecret([43; 32]), // Doesn't match the secret used above
total_msat: amount * 2,
});
},
_ => panic!("Expected PendingHTLCRouting::ReceiveKeysend"),
}
},
_ => {
panic!("Unexpected HTLCForwardInfo");
},
}
} else {
panic!("Expected pending receive");
};
nodes[3].node.process_pending_update_add_htlcs();
assert_eq!(nodes[3].node.receive_htlcs.lock().unwrap().len(), 1);
match nodes[3].node.receive_htlcs.lock().unwrap().get_mut(0).unwrap() {
&mut HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { ref mut forward_info, .. }) => {
match forward_info.routing {
PendingHTLCRouting::ReceiveKeysend { ref mut payment_data, .. } => {
*payment_data = Some(msgs::FinalOnionHopData {
payment_secret: PaymentSecret([43; 32]), // Doesn't match the secret used above
total_msat: amount * 2,
});
},
_ => panic!("Expected PendingHTLCRouting::ReceiveKeysend"),
}
},
_ => {
panic!("Unexpected HTLCForwardInfo");
},
}
nodes[3].node.process_pending_htlc_forwards();
let fail_type = HTLCHandlingFailureType::Receive { payment_hash };
expect_and_process_pending_htlcs_and_htlc_handling_failed(&nodes[3], &[fail_type]);
Expand Down
Loading