Skip to content

Commit 89f5d07

Browse files
Prefer to use legacy forward maps on manager read
We are working on removing the requirement of regularly persisting the ChannelManager, and as a result began reconstructing the manager's decode_update_add_htlcs map from Channel data on startup in a recent PR. At the time, we implemented ChannelManager::read to prefer to use the newly reconstructed map, partly to ensure we have test coverage of the new map's usage. This resulted in a lot of code that would deduplicate HTLCs that were present in the old maps, adding extra complexity. Instead, prefer to use the old maps if they are present (which will always be the case in prod), but avoid writing the legacy maps in test mode so tests will always exercise the new paths.
1 parent a63e344 commit 89f5d07

File tree

1 file changed

+92
-175
lines changed

1 file changed

+92
-175
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 92 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -11762,6 +11762,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1176211762

1176311763
if !new_intercept_events.is_empty() {
1176411764
let mut events = self.pending_events.lock().unwrap();
11765+
new_intercept_events.retain(|ev| !events.contains(ev));
1176511766
events.append(&mut new_intercept_events);
1176611767
}
1176711768
}
@@ -16684,7 +16685,12 @@ where
1668416685
}
1668516686
}
1668616687

16687-
{
16688+
#[cfg(not(any(test, feature = "_test_utils")))]
16689+
let reconstruct_manager_from_monitors = false;
16690+
#[cfg(any(test, feature = "_test_utils"))]
16691+
let reconstruct_manager_from_monitors = true;
16692+
16693+
if !reconstruct_manager_from_monitors {
1668816694
let forward_htlcs = self.forward_htlcs.lock().unwrap();
1668916695
(forward_htlcs.len() as u64).write(writer)?;
1669016696
for (short_channel_id, pending_forwards) in forward_htlcs.iter() {
@@ -16694,12 +16700,16 @@ where
1669416700
forward.write(writer)?;
1669516701
}
1669616702
}
16703+
} else {
16704+
0u64.write(writer)?;
1669716705
}
1669816706

1669916707
let mut decode_update_add_htlcs_opt = None;
16700-
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
16701-
if !decode_update_add_htlcs.is_empty() {
16702-
decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
16708+
if !reconstruct_manager_from_monitors {
16709+
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
16710+
if !decode_update_add_htlcs.is_empty() {
16711+
decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
16712+
}
1670316713
}
1670416714

1670516715
let claimable_payments = self.claimable_payments.lock().unwrap();
@@ -16845,9 +16855,11 @@ where
1684516855
}
1684616856

1684716857
let mut pending_intercepted_htlcs = None;
16848-
let our_pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
16849-
if our_pending_intercepts.len() != 0 {
16850-
pending_intercepted_htlcs = Some(our_pending_intercepts);
16858+
if !reconstruct_manager_from_monitors {
16859+
let our_pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
16860+
if our_pending_intercepts.len() != 0 {
16861+
pending_intercepted_htlcs = Some(our_pending_intercepts);
16862+
}
1685116863
}
1685216864

1685316865
let mut pending_claiming_payments = Some(&claimable_payments.pending_claiming_payments);
@@ -16888,6 +16900,7 @@ where
1688816900
(17, in_flight_monitor_updates, option),
1688916901
(19, peer_storage_dir, optional_vec),
1689016902
(21, WithoutLength(&self.flow.writeable_async_receive_offer_cache()), required),
16903+
(23, reconstruct_manager_from_monitors, required),
1689116904
});
1689216905

1689316906
// Remove the SpliceFailed events added earlier.
@@ -17600,9 +17613,10 @@ where
1760017613
};
1760117614
}
1760217615

17603-
// Some maps are read but may no longer be used because we attempt to rebuild the pending HTLC
17604-
// set from the `Channel{Monitor}`s instead, as a step towards removing the requirement of
17605-
// regularly persisting the `ChannelManager`.
17616+
// In LDK versions >0.2, we are taking steps to remove the requirement of regularly peristing
17617+
// the `ChannelManager`. To that end, if `reconstruct_manager_from_monitors` is set below, we
17618+
// will rebuild the pending HTLC set using data from the `Channel{Monitor}`s instead and ignore
17619+
// these legacy maps.
1760617620
let mut pending_intercepted_htlcs_legacy: Option<HashMap<InterceptId, PendingAddHTLCInfo>> =
1760717621
None;
1760817622
let mut decode_update_add_htlcs_legacy: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> =
@@ -17632,6 +17646,7 @@ where
1763217646
let mut inbound_payment_id_secret = None;
1763317647
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1763417648
let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new();
17649+
let mut _reconstruct_manager_from_monitors_opt: Option<bool> = None;
1763517650
read_tlv_fields!(reader, {
1763617651
(1, pending_outbound_payments_no_retry, option),
1763717652
(2, pending_intercepted_htlcs_legacy, option),
@@ -17650,7 +17665,19 @@ where
1765017665
(17, in_flight_monitor_updates, option),
1765117666
(19, peer_storage_dir, optional_vec),
1765217667
(21, async_receive_offer_cache, (default_value, async_receive_offer_cache)),
17668+
(23, _reconstruct_manager_from_monitors_opt, option),
1765317669
});
17670+
// In LDK 0.2 and below, the ChannelManager would track all payments and HTLCs internally and
17671+
// persist that state, relying on it being up-to-date on restart. Newer versions are moving
17672+
// towards reducing this reliance on regular persistence of the ChannelManager, and instead
17673+
// reconstruct HTLC/payment state based on ChannelMonitor data if
17674+
// `reconstruct_manager_from_monitors` is set. In tests, we want to always use the new
17675+
// codepaths.
17676+
#[cfg(any(test, feature = "_test_utils"))]
17677+
let reconstruct_manager_from_monitors = true;
17678+
#[cfg(not(any(test, feature = "_test_utils")))]
17679+
let reconstruct_manager_from_monitors = _reconstruct_manager_from_monitors_opt.unwrap_or(false);
17680+
1765417681
let mut decode_update_add_htlcs_legacy =
1765517682
decode_update_add_htlcs_legacy.unwrap_or_else(|| new_hash_map());
1765617683
let mut pending_intercepted_htlcs_legacy =
@@ -17967,18 +17994,20 @@ where
1796717994
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
1796817995
let peer_state = &mut *peer_state_lock;
1796917996
is_channel_closed = !peer_state.channel_by_id.contains_key(channel_id);
17970-
if let Some(chan) = peer_state.channel_by_id.get(channel_id) {
17971-
if let Some(funded_chan) = chan.as_funded() {
17972-
let inbound_committed_update_adds =
17973-
funded_chan.get_inbound_committed_update_adds();
17974-
if !inbound_committed_update_adds.is_empty() {
17975-
// Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized
17976-
// `Channel`, as part of removing the requirement to regularly persist the
17977-
// `ChannelManager`.
17978-
decode_update_add_htlcs.insert(
17979-
funded_chan.context.outbound_scid_alias(),
17980-
inbound_committed_update_adds,
17981-
);
17997+
if reconstruct_manager_from_monitors {
17998+
if let Some(chan) = peer_state.channel_by_id.get(channel_id) {
17999+
if let Some(funded_chan) = chan.as_funded() {
18000+
let inbound_committed_update_adds =
18001+
funded_chan.get_inbound_committed_update_adds();
18002+
if !inbound_committed_update_adds.is_empty() {
18003+
// Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized
18004+
// `Channel`, as part of removing the requirement to regularly persist the
18005+
// `ChannelManager`.
18006+
decode_update_add_htlcs.insert(
18007+
funded_chan.context.outbound_scid_alias(),
18008+
inbound_committed_update_adds,
18009+
);
18010+
}
1798218011
}
1798318012
}
1798418013
}
@@ -18033,15 +18062,18 @@ where
1803318062
info.prev_funding_outpoint == prev_hop_data.outpoint
1803418063
&& info.prev_htlc_id == prev_hop_data.htlc_id
1803518064
};
18036-
// We always add all inbound committed HTLCs to `decode_update_add_htlcs` in the above
18037-
// loop, but we need to prune from those added HTLCs if they were already forwarded to
18038-
// the outbound edge. Otherwise, we'll double-forward.
18039-
dedup_decode_update_add_htlcs(
18040-
&mut decode_update_add_htlcs,
18041-
&prev_hop_data,
18042-
"HTLC was forwarded to the closed channel",
18043-
&args.logger,
18044-
);
18065+
// If `reconstruct_manager_from_monitors` is set, we always add all inbound committed
18066+
// HTLCs to `decode_update_add_htlcs` in the above loop, but we need to prune from
18067+
// those added HTLCs if they were already forwarded to the outbound edge. Otherwise,
18068+
// we'll double-forward.
18069+
if reconstruct_manager_from_monitors {
18070+
dedup_decode_update_add_htlcs(
18071+
&mut decode_update_add_htlcs,
18072+
&prev_hop_data,
18073+
"HTLC was forwarded to the closed channel",
18074+
&args.logger,
18075+
);
18076+
}
1804518077
if is_channel_closed {
1804618078
// The ChannelMonitor is now responsible for this HTLC's
1804718079
// failure/success and will let us know what its outcome is. If we
@@ -18550,101 +18582,48 @@ where
1855018582
}
1855118583
}
1855218584

18553-
// De-duplicate HTLCs that are present in both `failed_htlcs` and `decode_update_add_htlcs`.
18554-
// Omitting this de-duplication could lead to redundant HTLC processing and/or bugs.
18555-
for (src, _, _, _, _, _) in failed_htlcs.iter() {
18556-
if let HTLCSource::PreviousHopData(prev_hop_data) = src {
18557-
dedup_decode_update_add_htlcs(
18558-
&mut decode_update_add_htlcs,
18559-
prev_hop_data,
18560-
"HTLC was failed backwards during manager read",
18561-
&args.logger,
18562-
);
18563-
}
18564-
}
18565-
18566-
// See above comment on `failed_htlcs`.
18567-
for htlcs in claimable_payments.values().map(|pmt| &pmt.htlcs) {
18568-
for prev_hop_data in htlcs.iter().map(|h| &h.prev_hop) {
18569-
dedup_decode_update_add_htlcs(
18570-
&mut decode_update_add_htlcs,
18571-
prev_hop_data,
18572-
"HTLC was already decoded and marked as a claimable payment",
18573-
&args.logger,
18574-
);
18575-
}
18576-
}
18577-
18578-
// Remove HTLCs from `forward_htlcs` if they are also present in `decode_update_add_htlcs`.
18579-
//
18580-
// In the future, the full set of pending HTLCs will be pulled from `Channel{Monitor}` data and
18581-
// placed in `ChannelManager::decode_update_add_htlcs` on read, to be handled on the next call
18582-
// to `process_pending_htlc_forwards`. This is part of a larger effort to remove the requirement
18583-
// of regularly persisting the `ChannelManager`. The new pipeline is supported for HTLC forwards
18584-
// received on LDK 0.3+ but not <= 0.2, so prune non-legacy HTLCs from `forward_htlcs`.
18585-
forward_htlcs_legacy.retain(|scid, pending_fwds| {
18586-
for fwd in pending_fwds {
18587-
let (prev_scid, prev_htlc_id) = match fwd {
18588-
HTLCForwardInfo::AddHTLC(htlc) => {
18589-
(htlc.prev_outbound_scid_alias, htlc.prev_htlc_id)
18590-
},
18591-
HTLCForwardInfo::FailHTLC { htlc_id, .. }
18592-
| HTLCForwardInfo::FailMalformedHTLC { htlc_id, .. } => (*scid, *htlc_id),
18593-
};
18594-
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
18595-
if pending_update_adds
18596-
.iter()
18597-
.any(|update_add| update_add.htlc_id == prev_htlc_id)
18598-
{
18599-
return false;
18600-
}
18585+
if reconstruct_manager_from_monitors {
18586+
// De-duplicate HTLCs that are present in both `failed_htlcs` and `decode_update_add_htlcs`.
18587+
// Omitting this de-duplication could lead to redundant HTLC processing and/or bugs.
18588+
for (src, _, _, _, _, _) in failed_htlcs.iter() {
18589+
if let HTLCSource::PreviousHopData(prev_hop_data) = src {
18590+
dedup_decode_update_add_htlcs(
18591+
&mut decode_update_add_htlcs,
18592+
prev_hop_data,
18593+
"HTLC was failed backwards during manager read",
18594+
&args.logger,
18595+
);
1860118596
}
1860218597
}
18603-
true
18604-
});
18605-
// Remove intercepted HTLC forwards if they are also present in `decode_update_add_htlcs`. See
18606-
// the above comment.
18607-
pending_intercepted_htlcs_legacy.retain(|id, fwd| {
18608-
let prev_scid = fwd.prev_outbound_scid_alias;
18609-
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
18610-
if pending_update_adds
18611-
.iter()
18612-
.any(|update_add| update_add.htlc_id == fwd.prev_htlc_id)
18613-
{
18614-
pending_events_read.retain(
18615-
|(ev, _)| !matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
18598+
18599+
// See above comment on `failed_htlcs`.
18600+
for htlcs in claimable_payments.values().map(|pmt| &pmt.htlcs) {
18601+
for prev_hop_data in htlcs.iter().map(|h| &h.prev_hop) {
18602+
dedup_decode_update_add_htlcs(
18603+
&mut decode_update_add_htlcs,
18604+
prev_hop_data,
18605+
"HTLC was already decoded and marked as a claimable payment",
18606+
&args.logger,
1861618607
);
18617-
return false;
1861818608
}
1861918609
}
18610+
}
18611+
18612+
// If we have a pending intercept HTLC present but no corresponding event, add that now.
18613+
for (id, intercept) in pending_intercepted_htlcs_legacy.iter() {
1862018614
if !pending_events_read.iter().any(
1862118615
|(ev, _)| matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
1862218616
) {
18623-
match create_htlc_intercepted_event(*id, &fwd) {
18617+
match create_htlc_intercepted_event(*id, intercept) {
1862418618
Ok(ev) => pending_events_read.push_back((ev, None)),
1862518619
Err(()) => debug_assert!(false),
1862618620
}
1862718621
}
18628-
true
18629-
});
18630-
// Add legacy update_adds that were received on LDK <= 0.2 that are not present in the
18631-
// `decode_update_add_htlcs` map that was rebuilt from `Channel{Monitor}` data, see above
18632-
// comment.
18633-
for (scid, legacy_update_adds) in decode_update_add_htlcs_legacy.drain() {
18634-
match decode_update_add_htlcs.entry(scid) {
18635-
hash_map::Entry::Occupied(mut update_adds) => {
18636-
for legacy_update_add in legacy_update_adds {
18637-
if !update_adds.get().contains(&legacy_update_add) {
18638-
update_adds.get_mut().push(legacy_update_add);
18639-
}
18640-
}
18641-
},
18642-
hash_map::Entry::Vacant(entry) => {
18643-
entry.insert(legacy_update_adds);
18644-
},
18645-
}
1864618622
}
1864718623

18624+
if !reconstruct_manager_from_monitors {
18625+
decode_update_add_htlcs = decode_update_add_htlcs_legacy;
18626+
}
1864818627
let best_block = BestBlock::new(best_block_hash, best_block_height);
1864918628
let flow = OffersMessageFlow::new(
1865018629
chain_hash,
@@ -19009,12 +18988,11 @@ where
1900918988
mod tests {
1901018989
use crate::events::{ClosureReason, Event, HTLCHandlingFailureType};
1901118990
use crate::ln::channelmanager::{
19012-
create_recv_pending_htlc_info, inbound_payment, HTLCForwardInfo, InterceptId, PaymentId,
18991+
create_recv_pending_htlc_info, inbound_payment, InterceptId, PaymentId,
1901318992
RecipientOnionFields,
1901418993
};
1901518994
use crate::ln::functional_test_utils::*;
1901618995
use crate::ln::msgs::{self, BaseMessageHandler, ChannelMessageHandler, MessageSendEvent};
19017-
use crate::ln::onion_utils::AttributionData;
1901818996
use crate::ln::onion_utils::{self, LocalHTLCFailureReason};
1901918997
use crate::ln::outbound_payment::Retry;
1902018998
use crate::ln::types::ChannelId;
@@ -19024,7 +19002,6 @@ mod tests {
1902419002
use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret};
1902519003
use crate::util::config::{ChannelConfig, ChannelConfigUpdate};
1902619004
use crate::util::errors::APIError;
19027-
use crate::util::ser::Writeable;
1902819005
use crate::util::test_utils;
1902919006
use bitcoin::secp256k1::ecdh::SharedSecret;
1903019007
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
@@ -20082,66 +20059,6 @@ mod tests {
2008220059
check_spends!(txn[0], funding_tx);
2008320060
}
2008420061
}
20085-
20086-
#[test]
20087-
#[rustfmt::skip]
20088-
fn test_malformed_forward_htlcs_ser() {
20089-
// Ensure that `HTLCForwardInfo::FailMalformedHTLC`s are (de)serialized properly.
20090-
let chanmon_cfg = create_chanmon_cfgs(1);
20091-
let node_cfg = create_node_cfgs(1, &chanmon_cfg);
20092-
let persister;
20093-
let chain_monitor;
20094-
let chanmgrs = create_node_chanmgrs(1, &node_cfg, &[None]);
20095-
let deserialized_chanmgr;
20096-
let mut nodes = create_network(1, &node_cfg, &chanmgrs);
20097-
20098-
let dummy_failed_htlc = |htlc_id| {
20099-
HTLCForwardInfo::FailHTLC { htlc_id, err_packet: msgs::OnionErrorPacket { data: vec![42], attribution_data: Some(AttributionData::new()) } }
20100-
};
20101-
let dummy_malformed_htlc = |htlc_id| {
20102-
HTLCForwardInfo::FailMalformedHTLC {
20103-
htlc_id,
20104-
failure_code: LocalHTLCFailureReason::InvalidOnionPayload.failure_code(),
20105-
sha256_of_onion: [0; 32],
20106-
}
20107-
};
20108-
20109-
let dummy_htlcs_1: Vec<HTLCForwardInfo> = (1..10).map(|htlc_id| {
20110-
if htlc_id % 2 == 0 {
20111-
dummy_failed_htlc(htlc_id)
20112-
} else {
20113-
dummy_malformed_htlc(htlc_id)
20114-
}
20115-
}).collect();
20116-
20117-
let dummy_htlcs_2: Vec<HTLCForwardInfo> = (1..10).map(|htlc_id| {
20118-
if htlc_id % 2 == 1 {
20119-
dummy_failed_htlc(htlc_id)
20120-
} else {
20121-
dummy_malformed_htlc(htlc_id)
20122-
}
20123-
}).collect();
20124-
20125-
20126-
let (scid_1, scid_2) = (42, 43);
20127-
let mut forward_htlcs = new_hash_map();
20128-
forward_htlcs.insert(scid_1, dummy_htlcs_1.clone());
20129-
forward_htlcs.insert(scid_2, dummy_htlcs_2.clone());
20130-
20131-
let mut chanmgr_fwd_htlcs = nodes[0].node.forward_htlcs.lock().unwrap();
20132-
*chanmgr_fwd_htlcs = forward_htlcs.clone();
20133-
core::mem::drop(chanmgr_fwd_htlcs);
20134-
20135-
reload_node!(nodes[0], nodes[0].node.encode(), &[], persister, chain_monitor, deserialized_chanmgr);
20136-
20137-
let mut deserialized_fwd_htlcs = nodes[0].node.forward_htlcs.lock().unwrap();
20138-
for scid in [scid_1, scid_2].iter() {
20139-
let deserialized_htlcs = deserialized_fwd_htlcs.remove(scid).unwrap();
20140-
assert_eq!(forward_htlcs.remove(scid).unwrap(), deserialized_htlcs);
20141-
}
20142-
assert!(deserialized_fwd_htlcs.is_empty());
20143-
core::mem::drop(deserialized_fwd_htlcs);
20144-
}
2014520062
}
2014620063

2014720064
#[cfg(ldk_bench)]

0 commit comments

Comments
 (0)