Skip to content
72 changes: 71 additions & 1 deletion lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18267,6 +18267,76 @@ where
}
}

// Remove HTLCs from `forward_htlcs` if they are also present in `decode_update_add_htlcs`.
//
// In the future, the full set of pending HTLCs will be pulled from `Channel{Monitor}` data and
// placed in `ChannelManager::decode_update_add_htlcs` on read, to be handled on the next call
// to `process_pending_htlc_forwards`. This is part of a larger effort to remove the requirement
// of regularly persisting the `ChannelManager`. The new pipeline is supported for HTLC forwards
// received on LDK 0.3+ but not <= 0.2, so prune non-legacy HTLCs from `forward_htlcs`.
forward_htlcs_legacy.retain(|scid, pending_fwds| {
for fwd in pending_fwds {
let (prev_scid, prev_htlc_id) = match fwd {
HTLCForwardInfo::AddHTLC(htlc) => {
(htlc.prev_outbound_scid_alias, htlc.prev_htlc_id)
},
HTLCForwardInfo::FailHTLC { htlc_id, .. }
| HTLCForwardInfo::FailMalformedHTLC { htlc_id, .. } => (*scid, *htlc_id),
};
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_mut -> get?

if pending_update_adds
.iter()
.any(|update_add| update_add.htlc_id == prev_htlc_id)
{
return false;
}
}
}
true
});
// Remove intercepted HTLC forwards if they are also present in `decode_update_add_htlcs`. See
// the above comment.
pending_intercepted_htlcs_legacy.retain(|id, fwd| {
let prev_scid = fwd.prev_outbound_scid_alias;
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
if pending_update_adds
.iter()
.any(|update_add| update_add.htlc_id == fwd.prev_htlc_id)
{
pending_events_read.retain(
|(ev, _)| !matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
);
return false;
}
}
if !pending_events_read.iter().any(
|(ev, _)| matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
) {
match create_htlc_intercepted_event(*id, &fwd) {
Ok(ev) => pending_events_read.push_back((ev, None)),
Err(()) => debug_assert!(false),
}
}
true
});
// Add legacy update_adds that were received on LDK <= 0.2 that are not present in the
// `decode_update_add_htlcs` map that was rebuilt from `Channel{Monitor}` data, see above
// comment.
for (scid, legacy_update_adds) in decode_update_add_htlcs_legacy.drain() {
match decode_update_add_htlcs.entry(scid) {
hash_map::Entry::Occupied(mut update_adds) => {
for legacy_update_add in legacy_update_adds {
if !update_adds.get().contains(&legacy_update_add) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store .get_mut() in a var outside of the loop and reuse it inside?

update_adds.get_mut().push(legacy_update_add);
}
}
},
hash_map::Entry::Vacant(entry) => {
entry.insert(legacy_update_adds);
},
}
}

let best_block = BestBlock::new(best_block_hash, best_block_height);
let flow = OffersMessageFlow::new(
chain_hash,
Expand Down Expand Up @@ -18296,7 +18366,7 @@ where
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs_legacy),

forward_htlcs: Mutex::new(forward_htlcs_legacy),
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs_legacy),
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
claimable_payments: Mutex::new(ClaimablePayments {
claimable_payments,
pending_claiming_payments: pending_claiming_payments.unwrap(),
Expand Down
3 changes: 2 additions & 1 deletion lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,9 +1382,10 @@ macro_rules! reload_node {
$node.onion_messenger.set_async_payments_handler(&$new_channelmanager);
};
($node: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: ident, $new_chain_monitor: ident, $new_channelmanager: ident) => {
let config = $node.node.get_current_config();
reload_node!(
$node,
test_default_channel_config(),
config,
$chanman_encoded,
$monitors_encoded,
$persister,
Expand Down
87 changes: 86 additions & 1 deletion lightning/src/ln/reload_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {

#[cfg(feature = "std")]
fn do_test_data_loss_protect(reconnect_panicing: bool, substantially_old: bool, not_stale: bool) {
use crate::ln::channelmanager::Retry;
use crate::ln::outbound_payment::Retry;
use crate::types::string::UntrustedString;
// When we get a data_loss_protect proving we're behind, we immediately panic as the
// chain::Watch API requirements have been violated (e.g. the user restored from a backup). The
Expand Down Expand Up @@ -1173,6 +1173,91 @@ fn removed_payment_no_manager_persistence() {
expect_payment_failed!(nodes[0], payment_hash, false);
}

#[test]
fn manager_persisted_pre_outbound_edge_forward() {
do_manager_persisted_pre_outbound_edge_forward(false);
}

#[test]
fn manager_persisted_pre_outbound_edge_intercept_forward() {
do_manager_persisted_pre_outbound_edge_forward(true);
}

fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let persister;
let new_chain_monitor;
let mut intercept_forwards_config = test_default_channel_config();
intercept_forwards_config.accept_intercept_htlcs = true;
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, Some(intercept_forwards_config), None]);
let nodes_1_deserialized;
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);

let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2).2;

// Lock in the HTLC from node_a <> node_b.
let amt_msat = 5000;
let (mut route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[2], amt_msat);
if intercept_htlc {
route.paths[0].hops[1].short_channel_id = nodes[1].node.get_intercept_scid();
}
nodes[0].node.send_payment_with_route(route, payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_hash.0)).unwrap();
check_added_monitors(&nodes[0], 1);
let updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id());
nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
do_commitment_signed_dance(&nodes[1], &nodes[0], &updates.commitment_signed, false, false);

// Decode the HTLC onion but don't forward it to the next hop, such that the HTLC ends up in
// `ChannelManager::forward_htlcs` or `ChannelManager::pending_intercepted_htlcs`.
nodes[1].node.test_process_pending_update_add_htlcs();

// Disconnect peers and reload the forwarding node_b.
nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id());
nodes[2].node.peer_disconnected(nodes[1].node.get_our_node_id());

let node_b_encoded = nodes[1].node.encode();

let chan_0_monitor_serialized = get_monitor!(nodes[1], chan_id_1).encode();
let chan_1_monitor_serialized = get_monitor!(nodes[1], chan_id_2).encode();
reload_node!(nodes[1], node_b_encoded, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], persister, new_chain_monitor, nodes_1_deserialized);

reconnect_nodes(ReconnectArgs::new(&nodes[1], &nodes[0]));
let mut args_b_c = ReconnectArgs::new(&nodes[1], &nodes[2]);
args_b_c.send_channel_ready = (true, true);
args_b_c.send_announcement_sigs = (true, true);
reconnect_nodes(args_b_c);

// Forward the HTLC and ensure we can claim it post-reload.
nodes[1].node.process_pending_htlc_forwards();

if intercept_htlc {
let events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
let (intercept_id, expected_outbound_amt_msat) = match events[0] {
Event::HTLCIntercepted { intercept_id, expected_outbound_amount_msat, .. } => {
(intercept_id, expected_outbound_amount_msat)
},
_ => panic!()
};
nodes[1].node.forward_intercepted_htlc(intercept_id, &chan_id_2,
nodes[2].node.get_our_node_id(), expected_outbound_amt_msat).unwrap();
nodes[1].node.process_pending_htlc_forwards();
}
check_added_monitors(&nodes[1], 1);

let updates = get_htlc_update_msgs(&nodes[1], &nodes[2].node.get_our_node_id());
nodes[2].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]);
do_commitment_signed_dance(&nodes[2], &nodes[1], &updates.commitment_signed, false, false);
expect_and_process_pending_htlcs(&nodes[2], false);

expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id());
let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]];
do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage));
expect_payment_sent(&nodes[0], payment_preimage, None, true, true);
}

#[test]
fn test_reload_partial_funding_batch() {
let chanmon_cfgs = create_chanmon_cfgs(3);
Expand Down