Skip to content

Commit 299861f

Browse files
committed
Batch commitment_signed messages in PeerManager
During splicing, commitment_signed messages need to be collected into a single batch before they are handled. Rather than including this as part of the channel state machine logic, batch when reading messages from the wire since they can be considered one logical message.
1 parent d796ddb commit 299861f

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
1616
//! messages they should handle, and encoding/sending response messages.
1717
18+
use bitcoin::Txid;
1819
use bitcoin::constants::ChainHash;
1920
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
2021

@@ -41,6 +42,8 @@ use crate::util::string::PrintableString;
4142
#[allow(unused_imports)]
4243
use crate::prelude::*;
4344

45+
use alloc::collections::{btree_map, BTreeMap};
46+
4447
use crate::io;
4548
use crate::sync::{Mutex, MutexGuard, FairRwLock};
4649
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
@@ -608,6 +611,8 @@ struct Peer {
608611
received_channel_announce_since_backlogged: bool,
609612

610613
inbound_connection: bool,
614+
615+
commitment_signed_batch: Option<(ChannelId, BTreeMap<Txid, msgs::CommitmentSigned>)>,
611616
}
612617

613618
impl Peer {
@@ -1140,6 +1145,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11401145

11411146
received_channel_announce_since_backlogged: false,
11421147
inbound_connection: false,
1148+
1149+
commitment_signed_batch: None,
11431150
}));
11441151
Ok(res)
11451152
}
@@ -1196,6 +1203,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11961203

11971204
received_channel_announce_since_backlogged: false,
11981205
inbound_connection: true,
1206+
1207+
commitment_signed_batch: None,
11991208
}));
12001209
Ok(())
12011210
}
@@ -1742,6 +1751,56 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
17421751
return Err(PeerHandleError { }.into());
17431752
}
17441753

1754+
// During splicing, commitment_signed messages need to be collected into a single batch
1755+
// before they are handled.
1756+
if let wire::Message::CommitmentSigned(mut msg) = message {
1757+
if let Some(batch) = msg.batch.take() {
1758+
let (channel_id, buffer) = peer_lock
1759+
.commitment_signed_batch
1760+
.get_or_insert_with(|| (msg.channel_id, BTreeMap::new()));
1761+
1762+
if msg.channel_id != *channel_id {
1763+
log_debug!(logger, "Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id);
1764+
return Err(PeerHandleError { }.into());
1765+
}
1766+
1767+
const COMMITMENT_SIGNED_BATCH_LIMIT: usize = 100;
1768+
if buffer.len() == COMMITMENT_SIGNED_BATCH_LIMIT {
1769+
log_debug!(logger, "Peer {} sent batched commitment_signed for channel {} exceeding the limit", log_pubkey!(their_node_id), channel_id);
1770+
return Err(PeerHandleError { }.into());
1771+
}
1772+
1773+
match buffer.entry(batch.funding_txid) {
1774+
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
1775+
btree_map::Entry::Occupied(_) => {
1776+
log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), channel_id, &batch.funding_txid);
1777+
return Err(PeerHandleError { }.into());
1778+
}
1779+
}
1780+
1781+
if buffer.len() >= batch.batch_size as usize {
1782+
let channel_id = *channel_id;
1783+
let mut batch = BTreeMap::new();
1784+
core::mem::swap(&mut batch, buffer);
1785+
1786+
peer_lock.commitment_signed_batch.take();
1787+
drop(peer_lock);
1788+
1789+
self.message_handler.chan_handler.handle_commitment_signed_batch(their_node_id, channel_id, &batch);
1790+
}
1791+
1792+
return Ok(None);
1793+
} else if peer_lock.commitment_signed_batch.is_some() {
1794+
log_debug!(logger, "Peer {} sent non-batched commitment_signed for channel {} when expecting batched commitment_signed", log_pubkey!(their_node_id), &msg.channel_id);
1795+
return Err(PeerHandleError { }.into());
1796+
} else {
1797+
return Ok(Some(wire::Message::CommitmentSigned(msg)));
1798+
}
1799+
} else if peer_lock.commitment_signed_batch.is_some() {
1800+
log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id));
1801+
return Err(PeerHandleError { }.into());
1802+
}
1803+
17451804
if let wire::Message::GossipTimestampFilter(_msg) = message {
17461805
// When supporting gossip messages, start initial gossip sync only after we receive
17471806
// a GossipTimestampFilter

0 commit comments

Comments
 (0)