Skip to content

Commit bb0db2a

Browse files
committed
(XXX: Squash all later commits into this) Always process RAA/CS ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline for handling inbound `revoke_and_ack` and `commitment_signed` messages.
1 parent 46c6fb7 commit bb0db2a

File tree

4 files changed

+67
-184
lines changed

4 files changed

+67
-184
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ fn test_monitor_and_persister_update_fail() {
143143
let mut node_0_per_peer_lock;
144144
let mut node_0_peer_state_lock;
145145
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
146-
if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
146+
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
147147
// Check that even though the persister is returning a InProgress,
148148
// because the update is bogus, ultimately the error that's returned
149149
// should be a PermanentFailure.
@@ -1739,7 +1739,6 @@ fn test_monitor_update_on_pending_forwards() {
17391739
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
17401740
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
17411741
check_added_monitors!(nodes[1], 1);
1742-
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
17431742

17441743
chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
17451744
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
@@ -1753,17 +1752,17 @@ fn test_monitor_update_on_pending_forwards() {
17531752

17541753
let events = nodes[0].node.get_and_clear_pending_events();
17551754
assert_eq!(events.len(), 3);
1756-
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[0] {
1755+
if let Event::PaymentPathFailed { payment_hash, payment_failed_permanently, .. } = events[1] {
17571756
assert_eq!(payment_hash, payment_hash_1);
17581757
assert!(payment_failed_permanently);
17591758
} else { panic!("Unexpected event!"); }
1760-
match events[1] {
1759+
match events[2] {
17611760
Event::PaymentFailed { payment_hash, .. } => {
17621761
assert_eq!(payment_hash, payment_hash_1);
17631762
},
17641763
_ => panic!("Unexpected event"),
17651764
}
1766-
match events[2] {
1765+
match events[0] {
17671766
Event::PendingHTLCsForwardable { .. } => { },
17681767
_ => panic!("Unexpected event"),
17691768
};

lightning/src/ln/channel.rs

Lines changed: 40 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -411,17 +411,6 @@ pub enum UpdateFulfillCommitFetch {
411411
DuplicateClaim {},
412412
}
413413

414-
/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
415-
/// state.
416-
pub(super) struct RAAUpdates {
417-
pub commitment_update: Option<msgs::CommitmentUpdate>,
418-
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
419-
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
420-
pub finalized_claimed_htlcs: Vec<HTLCSource>,
421-
pub monitor_update: ChannelMonitorUpdate,
422-
pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
423-
}
424-
425414
/// The return value of `monitor_updating_restored`
426415
pub(super) struct MonitorRestoreUpdates {
427416
pub raa: Option<msgs::RevokeAndACK>,
@@ -3049,17 +3038,17 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30493038
Ok(())
30503039
}
30513040

3052-
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
3041+
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
30533042
where L::Target: Logger
30543043
{
30553044
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
3056-
return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
3045+
return Err(ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()));
30573046
}
30583047
if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
3059-
return Err((None, ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned())));
3048+
return Err(ChannelError::Close("Peer sent commitment_signed when we needed a channel_reestablish".to_owned()));
30603049
}
30613050
if self.channel_state & BOTH_SIDES_SHUTDOWN_MASK == BOTH_SIDES_SHUTDOWN_MASK && self.last_sent_closing_fee.is_some() {
3062-
return Err((None, ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned())));
3051+
return Err(ChannelError::Close("Peer sent commitment_signed after we'd started exchanging closing_signeds".to_owned()));
30633052
}
30643053

30653054
let funding_script = self.get_funding_redeemscript();
@@ -3077,7 +3066,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30773066
log_bytes!(self.counterparty_funding_pubkey().serialize()), encode::serialize_hex(&bitcoin_tx.transaction),
30783067
log_bytes!(sighash[..]), encode::serialize_hex(&funding_script), log_bytes!(self.channel_id()));
30793068
if let Err(_) = self.secp_ctx.verify_ecdsa(&sighash, &msg.signature, &self.counterparty_funding_pubkey()) {
3080-
return Err((None, ChannelError::Close("Invalid commitment tx signature from peer".to_owned())));
3069+
return Err(ChannelError::Close("Invalid commitment tx signature from peer".to_owned()));
30813070
}
30823071
bitcoin_tx.txid
30833072
};
@@ -3092,7 +3081,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30923081
debug_assert!(!self.is_outbound());
30933082
let counterparty_reserve_we_require_msat = self.holder_selected_channel_reserve_satoshis * 1000;
30943083
if commitment_stats.remote_balance_msat < commitment_stats.total_fee_sat * 1000 + counterparty_reserve_we_require_msat {
3095-
return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned())));
3084+
return Err(ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()));
30963085
}
30973086
}
30983087
#[cfg(any(test, fuzzing))]
@@ -3114,7 +3103,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
31143103
}
31153104

31163105
if msg.htlc_signatures.len() != commitment_stats.num_nondust_htlcs {
3117-
return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs))));
3106+
return Err(ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), commitment_stats.num_nondust_htlcs)));
31183107
}
31193108

31203109
// TODO: Sadly, we pass HTLCs twice to ChannelMonitor: once via the HolderCommitmentTransaction and once via the update
@@ -3132,7 +3121,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
31323121
log_bytes!(msg.htlc_signatures[idx].serialize_compact()[..]), log_bytes!(keys.countersignatory_htlc_key.serialize()),
31333122
encode::serialize_hex(&htlc_tx), log_bytes!(htlc_sighash[..]), encode::serialize_hex(&htlc_redeemscript), log_bytes!(self.channel_id()));
31343123
if let Err(_) = self.secp_ctx.verify_ecdsa(&htlc_sighash, &msg.htlc_signatures[idx], &keys.countersignatory_htlc_key) {
3135-
return Err((None, ChannelError::Close("Invalid HTLC tx signature from peer".to_owned())));
3124+
return Err(ChannelError::Close("Invalid HTLC tx signature from peer".to_owned()));
31363125
}
31373126
htlcs_and_sigs.push((htlc, Some(msg.htlc_signatures[idx]), source));
31383127
} else {
@@ -3148,10 +3137,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
31483137
self.counterparty_funding_pubkey()
31493138
);
31503139

3151-
let next_per_commitment_point = self.holder_signer.get_per_commitment_point(self.cur_holder_commitment_transaction_number - 1, &self.secp_ctx);
31523140
self.holder_signer.validate_holder_commitment(&holder_commitment_tx, commitment_stats.preimages)
3153-
.map_err(|_| (None, ChannelError::Close("Failed to validate our commitment".to_owned())))?;
3154-
let per_commitment_secret = self.holder_signer.release_commitment_secret(self.cur_holder_commitment_transaction_number + 1);
3141+
.map_err(|_| ChannelError::Close("Failed to validate our commitment".to_owned()))?;
31553142

31563143
// Update state now that we've passed all the can-fail calls...
31573144
let mut need_commitment = false;
@@ -3196,7 +3183,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
31963183

31973184
self.cur_holder_commitment_transaction_number -= 1;
31983185
// Note that if we need_commitment & !AwaitingRemoteRevoke we'll call
3199-
// send_commitment_no_status_check() next which will reset this to RAAFirst.
3186+
// build_commitment_no_status_check() next which will reset this to RAAFirst.
32003187
self.resend_order = RAACommitmentOrder::CommitmentFirst;
32013188

32023189
if (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 {
@@ -3208,37 +3195,35 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
32083195
// the corresponding HTLC status updates so that get_last_commitment_update
32093196
// includes the right HTLCs.
32103197
self.monitor_pending_commitment_signed = true;
3211-
let (_, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
3212-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3198+
let mut additional_update = self.build_commitment_no_status_check(logger);
3199+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
32133200
// strictly increasing by one, so decrement it here.
32143201
self.latest_monitor_update_id = monitor_update.update_id;
32153202
monitor_update.updates.append(&mut additional_update.updates);
32163203
}
32173204
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
32183205
log_bytes!(self.channel_id));
3219-
return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
3206+
self.pending_monitor_updates.push(monitor_update);
3207+
return Ok(self.pending_monitor_updates.last().unwrap());
32203208
}
32213209

3222-
let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
3210+
let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
32233211
// If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
32243212
// we'll send one right away when we get the revoke_and_ack when we
32253213
// free_holding_cell_htlcs().
3226-
let (msg, mut additional_update) = self.send_commitment_no_status_check(logger).map_err(|e| (None, e))?;
3227-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3214+
let mut additional_update = self.build_commitment_no_status_check(logger);
3215+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
32283216
// strictly increasing by one, so decrement it here.
32293217
self.latest_monitor_update_id = monitor_update.update_id;
32303218
monitor_update.updates.append(&mut additional_update.updates);
3231-
Some(msg)
3232-
} else { None };
3219+
true
3220+
} else { false };
32333221

32343222
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
3235-
log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
3236-
3237-
Ok((msgs::RevokeAndACK {
3238-
channel_id: self.channel_id,
3239-
per_commitment_secret,
3240-
next_per_commitment_point,
3241-
}, commitment_signed, monitor_update))
3223+
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
3224+
self.pending_monitor_updates.push(monitor_update);
3225+
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
3226+
return Ok(self.pending_monitor_updates.last().unwrap());
32423227
}
32433228

32443229
/// Public version of the below, checking relevant preconditions first.
@@ -3370,7 +3355,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33703355
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
33713356
/// generating an appropriate error *after* the channel state has been updated based on the
33723357
/// revoke_and_ack message.
3373-
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
3358+
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
33743359
where L::Target: Logger,
33753360
{
33763361
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3557,8 +3542,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35573542
// When the monitor updating is restored we'll call get_last_commitment_update(),
35583543
// which does not update state, but we're definitely now awaiting a remote revoke
35593544
// before we can step forward any more, so set it here.
3560-
let (_, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3561-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3545+
let mut additional_update = self.build_commitment_no_status_check(logger);
3546+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
35623547
// strictly increasing by one, so decrement it here.
35633548
self.latest_monitor_update_id = monitor_update.update_id;
35643549
monitor_update.updates.append(&mut additional_update.updates);
@@ -3567,12 +3552,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35673552
self.monitor_pending_failures.append(&mut revoked_htlcs);
35683553
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
35693554
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
3570-
return Ok(RAAUpdates {
3571-
commitment_update: None, finalized_claimed_htlcs: Vec::new(),
3572-
accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
3573-
monitor_update,
3574-
holding_cell_failed_htlcs: Vec::new()
3575-
});
3555+
self.pending_monitor_updates.push(monitor_update);
3556+
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35763557
}
35773558

35783559
match self.free_holding_cell_htlcs(logger)? {
@@ -3591,47 +3572,29 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
35913572
self.latest_monitor_update_id = monitor_update.update_id;
35923573
monitor_update.updates.append(&mut additional_update.updates);
35933574

3594-
Ok(RAAUpdates {
3595-
commitment_update: Some(commitment_update),
3596-
finalized_claimed_htlcs,
3597-
accepted_htlcs: to_forward_infos,
3598-
failed_htlcs: revoked_htlcs,
3599-
monitor_update,
3600-
holding_cell_failed_htlcs: htlcs_to_fail
3601-
})
3575+
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3576+
self.pending_monitor_updates.push(monitor_update);
3577+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
36023578
},
36033579
(None, htlcs_to_fail) => {
36043580
if require_commitment {
3605-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3581+
let mut additional_update = self.build_commitment_no_status_check(logger);
36063582

3607-
// send_commitment_no_status_check may bump latest_monitor_id but we want them to be
3583+
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
36083584
// strictly increasing by one, so decrement it here.
36093585
self.latest_monitor_update_id = monitor_update.update_id;
36103586
monitor_update.updates.append(&mut additional_update.updates);
36113587

36123588
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
36133589
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
3614-
Ok(RAAUpdates {
3615-
commitment_update: Some(msgs::CommitmentUpdate {
3616-
update_add_htlcs: Vec::new(),
3617-
update_fulfill_htlcs: Vec::new(),
3618-
update_fail_htlcs,
3619-
update_fail_malformed_htlcs,
3620-
update_fee: None,
3621-
commitment_signed
3622-
}),
3623-
finalized_claimed_htlcs,
3624-
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
3625-
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
3626-
})
3590+
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3591+
self.pending_monitor_updates.push(monitor_update);
3592+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
36273593
} else {
36283594
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
3629-
Ok(RAAUpdates {
3630-
commitment_update: None,
3631-
finalized_claimed_htlcs,
3632-
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
3633-
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
3634-
})
3595+
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
3596+
self.pending_monitor_updates.push(monitor_update);
3597+
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
36353598
}
36363599
}
36373600
}
@@ -3818,6 +3781,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
38183781
{
38193782
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
38203783
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
3784+
self.pending_monitor_updates.clear();
38213785

38223786
// If we're past (or at) the FundingSent stage on an outbound channel, try to
38233787
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we

0 commit comments

Comments
 (0)