Skip to content

Replay lost MonitorEvents in some cases for closed channels #4004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 150 additions & 17 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use crate::util::ser::{
use crate::prelude::*;

use crate::io::{self, Error};
use crate::sync::{LockTestExt, Mutex};
use crate::sync::Mutex;
use core::ops::Deref;
use core::{cmp, mem};

Expand Down Expand Up @@ -1371,18 +1371,30 @@ macro_rules! holder_commitment_htlcs {
/// Transaction outputs to watch for on-chain spends.
pub type TransactionOutputs = (Txid, Vec<(u32, TxOut)>);

// Because we have weird workarounds for `ChannelMonitor` equality checks in `OnchainTxHandler` and
// `PackageTemplate` the equality implementation isn't really fit for public consumption. Instead,
// we only expose it during tests.
#[cfg(any(feature = "_test_utils", test))]
impl<Signer: EcdsaChannelSigner> PartialEq for ChannelMonitor<Signer>
where
Signer: PartialEq,
{
#[rustfmt::skip]
fn eq(&self, other: &Self) -> bool {
use crate::sync::LockTestExt;
// We need some kind of total lockorder. Absent a better idea, we sort by position in
// memory and take locks in that order (assuming that we can't move within memory while a
// lock is held).
let ord = ((self as *const _) as usize) < ((other as *const _) as usize);
let a = if ord { self.inner.unsafe_well_ordered_double_lock_self() } else { other.inner.unsafe_well_ordered_double_lock_self() };
let b = if ord { other.inner.unsafe_well_ordered_double_lock_self() } else { self.inner.unsafe_well_ordered_double_lock_self() };
let a = if ord {
self.inner.unsafe_well_ordered_double_lock_self()
} else {
other.inner.unsafe_well_ordered_double_lock_self()
};
let b = if ord {
other.inner.unsafe_well_ordered_double_lock_self()
} else {
self.inner.unsafe_well_ordered_double_lock_self()
};
a.eq(&b)
}
}
Expand Down Expand Up @@ -2944,33 +2956,152 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
/// This is similar to [`Self::get_pending_or_resolved_outbound_htlcs`] except it includes
/// HTLCs which were resolved on-chain (i.e. where the final HTLC resolution was done by an
/// event from this `ChannelMonitor`).
#[rustfmt::skip]
pub(crate) fn get_all_current_outbound_htlcs(&self) -> HashMap<HTLCSource, (HTLCOutputInCommitment, Option<PaymentPreimage>)> {
pub(crate) fn get_all_current_outbound_htlcs(
&self,
) -> HashMap<HTLCSource, (HTLCOutputInCommitment, Option<PaymentPreimage>)> {
let mut res = new_hash_map();
// Just examine the available counterparty commitment transactions. See docs on
// `fail_unbroadcast_htlcs`, below, for justification.
let us = self.inner.lock().unwrap();
macro_rules! walk_counterparty_commitment {
($txid: expr) => {
if let Some(ref latest_outpoints) = us.funding.counterparty_claimable_outpoints.get($txid) {
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
if let &Some(ref source) = source_option {
res.insert((**source).clone(), (htlc.clone(),
us.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).cloned()));
}
let mut walk_counterparty_commitment = |txid| {
if let Some(latest_outpoints) = us.funding.counterparty_claimable_outpoints.get(txid) {
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
if let &Some(ref source) = source_option {
let htlc_id = SentHTLCId::from_source(source);
let preimage_opt = us.counterparty_fulfilled_htlcs.get(&htlc_id).cloned();
res.insert((**source).clone(), (htlc.clone(), preimage_opt));
}
}
}
}
};
if let Some(ref txid) = us.funding.current_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
walk_counterparty_commitment(txid);
}
if let Some(ref txid) = us.funding.prev_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
walk_counterparty_commitment(txid);
}
res
}

/// Gets the set of outbound HTLCs which hit the chain and ultimately were claimed by us via
/// the timeout path and reached [`ANTI_REORG_DELAY`] confirmations. This is used to determine
/// if an HTLC has failed without the `ChannelManager` having seen it prior to being persisted.
pub(crate) fn get_onchain_failed_outbound_htlcs(&self) -> HashMap<HTLCSource, PaymentHash> {
let mut res = new_hash_map();
let us = self.inner.lock().unwrap();

// We only want HTLCs with ANTI_REORG_DELAY confirmations, which implies the commitment
// transaction has least ANTI_REORG_DELAY confirmations for any dependent HTLC transactions
// to have been confirmed.
let confirmed_txid = us.funding_spend_confirmed.or_else(|| {
us.onchain_events_awaiting_threshold_conf.iter().find_map(|event| {
if let OnchainEvent::FundingSpendConfirmation { .. } = event.event {
if event.height <= us.best_block.height - ANTI_REORG_DELAY + 1 {
Some(event.txid)
} else {
None
}
} else {
None
}
})
});

let confirmed_txid = if let Some(txid) = confirmed_txid {
txid
} else {
return res;
};

macro_rules! walk_htlcs {
($holder_commitment: expr, $htlc_iter: expr) => {
let mut walk_candidate_htlcs = |htlcs| {
for &(ref candidate_htlc, ref candidate_source) in htlcs {
let candidate_htlc: &HTLCOutputInCommitment = &candidate_htlc;
let candidate_source: &Option<Box<HTLCSource>> = &candidate_source;

let source: &HTLCSource = if let Some(source) = candidate_source {
source
} else {
continue;
};
let confirmed = $htlc_iter.find(|(_, conf_src)| Some(source) == *conf_src);
if let Some((confirmed_htlc, _)) = confirmed {
let filter = |v: &&IrrevocablyResolvedHTLC| {
v.commitment_tx_output_idx
== confirmed_htlc.transaction_output_index
};

// The HTLC was included in the confirmed commitment transaction, so we
// need to see if it has been irrevocably failed yet.
if confirmed_htlc.transaction_output_index.is_none() {
// Dust HTLCs are always implicitly failed once the commitment
// transaction reaches ANTI_REORG_DELAY confirmations.
res.insert(source.clone(), confirmed_htlc.payment_hash);
} else if let Some(state) =
us.htlcs_resolved_on_chain.iter().filter(filter).next()
{
if state.payment_preimage.is_none() {
res.insert(source.clone(), confirmed_htlc.payment_hash);
}
}
} else {
// The HTLC was not included in the confirmed commitment transaction,
// which has now reached ANTI_REORG_DELAY confirmations and thus the
// HTLC has been failed.
res.insert(source.clone(), candidate_htlc.payment_hash);
}
}
};

// We walk the set of HTLCs in the unrevoked counterparty commitment transactions (see
// `fail_unbroadcast_htlcs` for a description of why).
if let Some(ref txid) = us.funding.current_counterparty_commitment_txid {
if let Some(htlcs) = us.funding.counterparty_claimable_outpoints.get(txid) {
walk_candidate_htlcs(htlcs);
}
}
if let Some(ref txid) = us.funding.prev_counterparty_commitment_txid {
if let Some(htlcs) = us.funding.counterparty_claimable_outpoints.get(txid) {
walk_candidate_htlcs(htlcs);
}
}
};
}

let funding = get_confirmed_funding_scope!(us);

if Some(confirmed_txid) == funding.current_counterparty_commitment_txid
|| Some(confirmed_txid) == funding.prev_counterparty_commitment_txid
{
let htlcs = funding.counterparty_claimable_outpoints.get(&confirmed_txid).unwrap();
walk_htlcs!(
false,
htlcs.iter().filter_map(|(a, b)| {
if let &Some(ref source) = b {
Some((a, Some(&**source)))
} else {
None
}
})
);
} else if confirmed_txid == funding.current_holder_commitment_tx.trust().txid() {
walk_htlcs!(true, holder_commitment_htlcs!(us, CURRENT_WITH_SOURCES));
} else if let Some(prev_commitment_tx) = &funding.prev_holder_commitment_tx {
if confirmed_txid == prev_commitment_tx.trust().txid() {
walk_htlcs!(true, holder_commitment_htlcs!(us, PREV_WITH_SOURCES).unwrap());
} else {
let htlcs_confirmed: &[(&HTLCOutputInCommitment, _)] = &[];
walk_htlcs!(false, htlcs_confirmed.iter());
}
} else {
let htlcs_confirmed: &[(&HTLCOutputInCommitment, _)] = &[];
walk_htlcs!(false, htlcs_confirmed.iter());
}

res
}

/// Gets the set of outbound HTLCs which are pending resolution in this channel or which were
/// resolved with a preimage from our counterparty.
///
Expand Down Expand Up @@ -5775,6 +5906,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
on_to_local_output_csv: None,
},
});
self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage);
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
Expand All @@ -5798,6 +5930,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitorImpl<Signer> {
on_to_local_output_csv: None,
},
});
self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage);
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
Expand Down
46 changes: 45 additions & 1 deletion lightning/src/chain/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ enum PackageMalleability {
///
/// As packages are time-sensitive, we fee-bump and rebroadcast them at scheduled intervals.
/// Failing to confirm a package translate as a loss of funds for the user.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, Eq)]
pub struct PackageTemplate {
// List of onchain outputs and solving data to generate satisfying witnesses.
inputs: Vec<(BitcoinOutPoint, PackageSolvingData)>,
Expand Down Expand Up @@ -1122,6 +1122,50 @@ pub struct PackageTemplate {
height_timer: u32,
}

impl PartialEq for PackageTemplate {
fn eq(&self, o: &Self) -> bool {
if self.inputs != o.inputs
|| self.malleability != o.malleability
|| self.feerate_previous != o.feerate_previous
|| self.height_timer != o.height_timer
{
return false;
}
#[cfg(test)]
{
// In some cases we may reset `counterparty_spendable_height` to zero on reload, which
// can cause our test assertions that ChannelMonitors round-trip exactly to trip. Here
// we allow exactly the same case as we tweak in the `PackageTemplate` `Readable`
// implementation.
if self.counterparty_spendable_height == 0 {
for (_, input) in self.inputs.iter() {
if let PackageSolvingData::RevokedHTLCOutput(RevokedHTLCOutput {
htlc, ..
}) = input
{
if !htlc.offered && htlc.cltv_expiry != 0 {
return true;
}
}
}
}
if o.counterparty_spendable_height == 0 {
for (_, input) in o.inputs.iter() {
if let PackageSolvingData::RevokedHTLCOutput(RevokedHTLCOutput {
htlc, ..
}) = input
{
if !htlc.offered && htlc.cltv_expiry != 0 {
return true;
}
}
}
}
}
self.counterparty_spendable_height == o.counterparty_spendable_height
}
}

impl PackageTemplate {
#[rustfmt::skip]
pub(crate) fn can_merge_with(&self, other: &PackageTemplate, cur_height: u32) -> bool {
Expand Down
49 changes: 40 additions & 9 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15739,7 +15739,7 @@ where
log_error!(logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
&channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
}
let mut shutdown_result =
let shutdown_result =
channel.force_shutdown(ClosureReason::OutdatedChannelManager);
if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
return Err(DecodeError::InvalidValue);
Expand Down Expand Up @@ -15771,7 +15771,10 @@ where
},
);
}
failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
for (source, hash, cp_id, chan_id) in shutdown_result.dropped_outbound_htlcs {
let reason = LocalHTLCFailureReason::ChannelClosed;
failed_htlcs.push((source, hash, cp_id, chan_id, reason));
}
channel_closures.push_back((
events::Event::ChannelClosed {
channel_id: channel.context.channel_id(),
Expand Down Expand Up @@ -15813,6 +15816,7 @@ where
*payment_hash,
channel.context.get_counterparty_node_id(),
channel.context.channel_id(),
LocalHTLCFailureReason::ChannelClosed,
));
}
}
Expand Down Expand Up @@ -16386,6 +16390,10 @@ where
// payments which are still in-flight via their on-chain state.
// We only rebuild the pending payments map if we were most recently serialized by
// 0.0.102+
//
// First we rebuild the pending payments, and only once we do so we go through and
// re-claim and re-fail pending payments. This avoids edge-cases around MPP payments
// resulting in redundant actions.
for (channel_id, monitor) in args.channel_monitors.iter() {
let mut is_channel_closed = false;
let counterparty_node_id = monitor.get_counterparty_node_id();
Expand Down Expand Up @@ -16424,6 +16432,18 @@ where
);
}
}
}
}
for (channel_id, monitor) in args.channel_monitors.iter() {
let mut is_channel_closed = false;
let counterparty_node_id = monitor.get_counterparty_node_id();
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
let peer_state = &mut *peer_state_lock;
is_channel_closed = !peer_state.channel_by_id.contains_key(channel_id);
}
Comment on lines +16437 to +16444
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing the bug that this is fixing -- if a payment is failed after being fulfilled in outbound_payments, we'll terminate early after noticing it's fulfilled already in OutboundPayments::fail_htlc

Context from the commit message: "... we could get both a PaymentFailed and a PaymentClaimed event on startup for the same payment"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say we send an MPP payment along two different channels. One is claimed and the other failed (who knows why, the recipient just decided they don't like money for whatever reason).

Going through the single loop we may first find the failed-htlc channel - we'll add the pending payment in insert_from_monitor_on_startup which will just add the one part, then we'll see it failed and generate a PaymentFailed event since there's only one part. Then we'll go to the second channel and repeat the same process, but now with a PaymentSent event.

Comment on lines +16437 to +16444
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would appreciate some more context on this part of the commit message: "this can lead to a pending payment getting re-added and re-claimed multiple times" (phrased as bad).

It looks like in the prior code and this new code, we'll call OutboundPayments::insert_from_monitor_on_startup for each session_priv (seems fine), and then call claim_htlc for each session_priv (seems fine since this will only generate a PaymentSent event on the first call to claim_htlc). Can you help me understand what I'm missing that was buggy in the prior code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the commit message.


if is_channel_closed {
for (htlc_source, (htlc, preimage_opt)) in
monitor.get_all_current_outbound_htlcs()
{
Expand Down Expand Up @@ -16521,6 +16541,20 @@ where
},
}
}
for (htlc_source, payment_hash) in monitor.get_onchain_failed_outbound_htlcs() {
log_info!(
args.logger,
"Failing HTLC with payment hash {} as it was resolved on-chain.",
payment_hash
);
failed_htlcs.push((
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we generate a PaymentFailed and/or HTLCHandlingFailed event for these HTLCs on every restart until the monitor is removed? Could be worth noting in a comment if so

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fixed in #3984 :)

htlc_source,
payment_hash,
monitor.get_counterparty_node_id(),
monitor.channel_id(),
LocalHTLCFailureReason::OnChainTimeout,
));
}
}

// Whether the downstream channel was closed or not, try to re-apply any payment
Expand Down Expand Up @@ -17201,13 +17235,10 @@ where
}
}

for htlc_source in failed_htlcs.drain(..) {
let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
let failure_reason = LocalHTLCFailureReason::ChannelClosed;
let receiver = HTLCHandlingFailureType::Forward {
node_id: Some(counterparty_node_id),
channel_id,
};
for htlc_source in failed_htlcs {
let (source, payment_hash, counterparty_id, channel_id, failure_reason) = htlc_source;
let receiver =
HTLCHandlingFailureType::Forward { node_id: Some(counterparty_id), channel_id };
let reason = HTLCFailReason::from_failure_code(failure_reason);
channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
Expand Down
Loading
Loading