Skip to content

Commit 13cbb03

Browse files
committed
Batch commitment_signed messages in PeerManager
During splicing, commitment_signed messages need to be collected into a single batch before they are handled. Rather than including this as part of the channel state machine logic, batch when reading messages from the wire since they can be considered one logical message.
1 parent 4caac9c commit 13cbb03

File tree

1 file changed

+46
-0
lines changed

1 file changed

+46
-0
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
1616
//! messages they should handle, and encoding/sending response messages.
1717
18+
use bitcoin::Txid;
1819
use bitcoin::constants::ChainHash;
1920
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
2021

@@ -41,6 +42,8 @@ use crate::util::string::PrintableString;
4142
#[allow(unused_imports)]
4243
use crate::prelude::*;
4344

45+
use alloc::collections::{btree_map, BTreeMap};
46+
4447
use crate::io;
4548
use crate::sync::{Mutex, MutexGuard, FairRwLock};
4649
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
@@ -608,6 +611,8 @@ struct Peer {
608611
received_channel_announce_since_backlogged: bool,
609612

610613
inbound_connection: bool,
614+
615+
commitment_signed_batch: Option<(ChannelId, BTreeMap<Txid, msgs::CommitmentSigned>)>,
611616
}
612617

613618
impl Peer {
@@ -1140,6 +1145,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11401145

11411146
received_channel_announce_since_backlogged: false,
11421147
inbound_connection: false,
1148+
1149+
commitment_signed_batch: None,
11431150
}));
11441151
Ok(res)
11451152
}
@@ -1196,6 +1203,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11961203

11971204
received_channel_announce_since_backlogged: false,
11981205
inbound_connection: true,
1206+
1207+
commitment_signed_batch: None,
11991208
}));
12001209
Ok(())
12011210
}
@@ -1742,6 +1751,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17421751
return Err(PeerHandleError { }.into());
17431752
}
17441753

1754+
// During splicing, commitment_signed messages need to be collected into a single batch
1755+
// before they are handled.
1756+
if let wire::Message::CommitmentSigned(mut msg) = message {
1757+
if let Some(batch) = msg.batch.take() {
1758+
let (channel_id, cache) = peer_lock
1759+
.commitment_signed_batch
1760+
.get_or_insert_with(|| (msg.channel_id, BTreeMap::new()));
1761+
1762+
if msg.channel_id != *channel_id {
1763+
log_debug!(logger, "Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1764+
return Err(PeerHandleError { }.into());
1765+
}
1766+
1767+
match cache.entry(batch.funding_txid) {
1768+
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
1769+
btree_map::Entry::Occupied(_) => {
1770+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), channel_id, &batch.funding_txid);
1771+
return Err(PeerHandleError { }.into());
1772+
}
1773+
}
1774+
1775+
if cache.len() >= batch.batch_size as usize {
1776+
// TODO: Handle batched commitment_signed
1777+
}
1778+
1779+
return Ok(None);
1780+
} else if peer_lock.commitment_signed_batch.is_some() {
1781+
log_debug!(logger, "Peer {} sent non-batched commitment_signed for channel {} when expecting commitment_signed", log_pubkey!(their_node_id), &msg.channel_id);
1782+
return Err(PeerHandleError { }.into());
1783+
} else {
1784+
return Ok(Some(wire::Message::CommitmentSigned(msg)));
1785+
}
1786+
} else if peer_lock.commitment_signed_batch.is_some() {
1787+
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1788+
return Err(PeerHandleError { }.into());
1789+
}
1790+
17451791
if let wire::Message::GossipTimestampFilter(_msg) = message {
17461792
// When supporting gossip messages, start initial gossip sync only after we receive
17471793
// a GossipTimestampFilter

0 commit comments

Comments
 (0)