Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 92 additions & 14 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use lightning::blinded_path::message::{BlindedMessagePath, MessageContext, Messa
use lightning::blinded_path::payment::{BlindedPaymentPath, ReceiveTlvs};
use lightning::chain;
use lightning::chain::chaininterface::{
BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType,
BroadcasterInterface, ConfirmationTarget, FeeEstimator, FundingPurpose, TransactionType,
};
use lightning::chain::channelmonitor::ChannelMonitor;
use lightning::chain::{
Expand Down Expand Up @@ -102,6 +102,7 @@ use std::sync::atomic;
use std::sync::{Arc, Mutex};

const MAX_FEE: u32 = 10_000;
const MAX_SETTLE_ITERATIONS: usize = 256;
struct FuzzEstimator {
ret_val: atomic::AtomicU32,
}
Expand Down Expand Up @@ -170,12 +171,12 @@ impl MessageRouter for FuzzRouter {
}

pub struct TestBroadcaster {
txn_broadcasted: RefCell<Vec<Transaction>>,
txn_broadcasted: RefCell<Vec<(Transaction, TransactionType)>>,
}
impl BroadcasterInterface for TestBroadcaster {
fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) {
for (tx, _broadcast_type) in txs {
self.txn_broadcasted.borrow_mut().push((*tx).clone());
for (tx, broadcast_type) in txs {
self.txn_broadcasted.borrow_mut().push(((*tx).clone(), broadcast_type.clone()));
}
}
}
Expand Down Expand Up @@ -1164,6 +1165,7 @@ impl<'a> HarnessNode<'a> {

let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args)
.expect("Failed to read manager");
self.height = manager.0.height;
let expected_status = if self.deferred {
ChannelMonitorUpdateStatus::InProgress
} else {
Expand Down Expand Up @@ -2008,10 +2010,71 @@ fn assert_test_invariants(nodes: &[HarnessNode<'_>; 3]) {
assert_eq!(nodes[1].list_channels().len(), 6);
assert_eq!(nodes[2].list_channels().len(), 3);

// All broadcasters should be empty. Broadcast transactions are handled explicitly.
assert!(nodes[0].broadcaster.txn_broadcasted.borrow().is_empty());
assert!(nodes[1].broadcaster.txn_broadcasted.borrow().is_empty());
assert!(nodes[2].broadcaster.txn_broadcasted.borrow().is_empty());
// Broadcast transactions are handled explicitly. If the input ends immediately after
// `tx_signatures`, however, the corresponding `SpliceNegotiated` event may still be pending,
// leaving the valid interactive funding transaction in the test broadcaster.
for (idx, node) in nodes.iter().enumerate() {
if node.broadcaster.txn_broadcasted.borrow().is_empty() {
continue;
}

let pending_events = node.get_and_clear_pending_events();
Comment on lines +2016 to +2021
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verification is one-directional: for each broadcast tx, you check a matching SpliceNegotiated event exists, but you don't verify the reverse — that every pending SpliceNegotiated event has a corresponding broadcast tx. A node with no broadcasts (skipped at line 2017) but with dangling SpliceNegotiated events passes silently.

Consider also checking nodes that have no broadcasts but do have pending SpliceNegotiated events, as that would indicate the splice tx was not broadcast despite an event being generated.

Suggested change
for (idx, node) in nodes.iter().enumerate() {
if node.broadcaster.txn_broadcasted.borrow().is_empty() {
continue;
}
let pending_events = node.get_and_clear_pending_events();
for (idx, node) in nodes.iter().enumerate() {
let pending_events = node.get_and_clear_pending_events();
if node.broadcaster.txn_broadcasted.borrow().is_empty() {
assert!(
!pending_events.iter().any(|e| matches!(e, events::Event::SpliceNegotiated { .. })),
"node {} has pending SpliceNegotiated event(s) but no broadcast tx",
idx,
);
continue;
}

let expected_splice_events = {
let txs = node.broadcaster.txn_broadcasted.borrow();
let mut expected_splice_events = Vec::new();
for (tx, tx_type) in txs.iter() {
let txid = tx.compute_txid();
let candidates = match tx_type {
TransactionType::InteractiveFunding { candidates } => candidates,
_ => panic!("node {} had unexpected broadcast transaction: {:?}", idx, tx_type),
};
for funding in &candidates.last().unwrap().channels {
assert!(
matches!(&funding.purpose, FundingPurpose::Splice),
"node {} had leftover non-splice interactive funding broadcast: {:?}",
idx,
funding
);
expected_splice_events.push((
txid.clone(),
funding.counterparty_node_id.clone(),
funding.channel_id.clone(),
));
}
}
expected_splice_events
};

let mut pending_splice_events = pending_events
.iter()
.filter_map(|event| match event {
events::Event::SpliceNegotiated {
new_funding_txo,
counterparty_node_id,
channel_id,
..
} => Some((
new_funding_txo.txid.clone(),
counterparty_node_id.clone(),
channel_id.clone(),
)),
_ => None,
})
.collect::<Vec<_>>();
for expected_splice_event in expected_splice_events {
let pending_idx =
pending_splice_events.iter().position(|event| event == &expected_splice_event);
assert!(
pending_idx.is_some(),
"node {} had leftover interactive funding broadcast without matching \
pending SpliceNegotiated event: {:?}; pending events: {:?}",
idx,
expected_splice_event,
pending_events
);
pending_splice_events.remove(pending_idx.unwrap());
}
}
}

fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) {
Expand Down Expand Up @@ -2814,14 +2877,26 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
..
} => {
let signed_tx = nodes[node_idx].wallet.sign_tx(unsigned_transaction).unwrap();
nodes[node_idx]
.funding_transaction_signed(&channel_id, &counterparty_node_id, signed_tx)
.unwrap();
match nodes[node_idx].funding_transaction_signed(
&channel_id,
&counterparty_node_id,
signed_tx,
) {
Ok(()) => {},
Err(APIError::APIMisuseError { ref err })
if err.contains("not expecting funding signatures") =>
{
// A queued signing event can be invalidated by a later `tx_abort`
// before the application handles it.
},
Err(e) => panic!("{e:?}"),
}
},
events::Event::SpliceNegotiated { new_funding_txo, .. } => {
let mut txs = nodes[node_idx].broadcaster.txn_broadcasted.borrow_mut();
assert!(txs.len() >= 1);
let splice_tx = txs.remove(0);
let (splice_tx, tx_type) = txs.remove(0);
assert!(matches!(tx_type, TransactionType::InteractiveFunding { .. }));
assert_eq!(new_funding_txo.txid, splice_tx.compute_txid());
chain_state.add_pending_tx(splice_tx);
},
Expand Down Expand Up @@ -2854,9 +2929,9 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
fn process_all_events(&mut self) {
let mut last_pass_no_updates = false;
for i in 0..std::usize::MAX {
if i == 100 {
if i == MAX_SETTLE_ITERATIONS {
panic!(
"It may take may iterations to settle the state, but it should not take forever"
"It may take many iterations to settle the state, but it should not take forever"
);
}
let mut made_progress = self.checkpoint_manager_persistences();
Expand Down Expand Up @@ -2927,6 +3002,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
if !self.nodes[node_idx].deferred {
self.nodes[node_idx].checkpoint_manager_persistence();
}
let pre_reload_height = self.nodes[node_idx].height;
match node_idx {
0 => {
self.ab_link.disconnect_for_reload(0, &self.nodes, &mut self.queues);
Expand All @@ -2947,6 +3023,8 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> {
for payment_hash in rolled_back_payment_hashes {
self.payments.claimed_payment_hashes.remove(&payment_hash);
}
let resync_blocks = pre_reload_height.saturating_sub(self.nodes[node_idx].height);
self.nodes[node_idx].sync_with_chain_state(&self.chain_state, Some(resync_blocks));
}

fn settle_all(&mut self) {
Expand Down
Loading