diff --git a/Cargo.lock b/Cargo.lock index c4d701a135eea..04d36e1cd7822 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20222,6 +20222,7 @@ version = "0.34.0" dependencies = [ "array-bytes 6.2.2", "async-channel 1.9.0", + "async-trait", "futures", "log", "parity-scale-codec", @@ -20233,6 +20234,7 @@ dependencies = [ "sp-runtime 43.0.0", "sp-statement-store", "substrate-prometheus-endpoint", + "tokio", ] [[package]] diff --git a/prdoc/pr_9912.prdoc b/prdoc/pr_9912.prdoc new file mode 100644 index 0000000000000..5e9cdd1a02e57 --- /dev/null +++ b/prdoc/pr_9912.prdoc @@ -0,0 +1,14 @@ +title: '[WIP] Fix statement-store gossiping' +doc: +- audience: Node Dev + description: |- + Fixes gossiping and scalability issues in the statement-store networking: + reduces traffic by propagating only recent statements, skips duplicate processing, + and splits large batches to stay under MAX_STATEMENT_NOTIFICATION_SIZE. +crates: +- name: sc-network-statement + bump: minor +- name: sc-statement-store + bump: minor +- name: sp-statement-store + bump: minor diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index 58f253ddb5dbf..4db9e3d338fef 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -21,6 +21,7 @@ async-channel = { workspace = true } codec = { features = ["derive"], workspace = true, default-features = true } futures = { workspace = true } log = { workspace = true, default-features = true } +<<<<<<< HEAD prometheus-endpoint.default-features = true prometheus-endpoint.workspace = true sc-network.default-features = true @@ -37,3 +38,17 @@ sp-runtime.default-features = true sp-runtime.workspace = true sp-statement-store.default-features = true sp-statement-store.workspace = true +======= +prometheus-endpoint = { workspace = true, default-features = true } +sc-network = { workspace = true, default-features = true } +sc-network-common = { workspace = true, default-features = true } +sc-network-sync = { workspace = true, default-features = true } +sc-network-types = { workspace = true, default-features = true } +sp-consensus = { workspace = true, default-features = true } +sp-runtime = { workspace = true, default-features = true } +sp-statement-store = { workspace = true, default-features = true } +tokio = { workspace = true } + +[dev-dependencies] +async-trait = { workspace = true } +>>>>>>> b21cbb58 (Improve statement-store gossiping performance (#9912)) diff --git a/substrate/client/network/statement/src/config.rs b/substrate/client/network/statement/src/config.rs index 159998a0fe300..be9e3c745440e 100644 --- a/substrate/client/network/statement/src/config.rs +++ b/substrate/client/network/statement/src/config.rs @@ -24,10 +24,14 @@ use std::time; pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(1000); /// Maximum number of known statement hashes to keep for a peer. +<<<<<<< HEAD pub(crate) const MAX_KNOWN_STATEMENTS: usize = 10240; +======= +pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; // * 32 bytes for hash = 128 MB per peer +>>>>>>> b21cbb58 (Improve statement-store gossiping performance (#9912)) /// Maximum allowed size for a statement notification. -pub(crate) const MAX_STATEMENT_SIZE: u64 = 256 * 1024; +pub(crate) const MAX_STATEMENT_NOTIFICATION_SIZE: u64 = 1024 * 1024; /// Maximum number of statement validation request we keep at any moment. pub(crate) const MAX_PENDING_STATEMENTS: usize = 8192; diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index df93788696e38..4b39492062b55 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -30,7 +30,9 @@ use crate::config::*; use codec::{Decode, Encode}; use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt}; -use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; +use prometheus_endpoint::{ + prometheus, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64, +}; use sc_network::{ config::{NonReservedPeerMode, SetConfig}, error, multiaddr, @@ -57,6 +59,7 @@ use std::{ pin::Pin, sync::Arc, }; +use tokio::time::timeout; pub mod config; @@ -85,9 +88,16 @@ mod rep { } const LOG_TARGET: &str = "statement-gossip"; +/// Maximim time we wait for sending a notification to a peer. +const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); struct Metrics { propagated_statements: Counter, + known_statements_received: Counter, + skipped_oversized_statements: Counter, + propagated_statements_chunks: Histogram, + pending_statements: Gauge, + ignored_statements: Counter, } impl Metrics { @@ -100,6 +110,43 @@ impl Metrics { )?, r, )?, + known_statements_received: register( + Counter::new( + "substrate_sync_known_statement_received", + "Number of statements received via gossiping that were already in the statement store", + )?, + r, + )?, + skipped_oversized_statements: register( + Counter::new( + "substrate_sync_skipped_oversized_statements", + "Number of oversized statements that were skipped to be gossiped", + )?, + r, + )?, + propagated_statements_chunks: register( + Histogram::with_opts( + HistogramOpts::new( + "substrate_sync_propagated_statements_chunks", + "Distribution of chunk sizes when propagating statements", + ).buckets(prometheus::exponential_buckets(1.0, 2.0, 14)?), + )?, + r, + )?, + pending_statements: register( + Gauge::new( + "substrate_sync_pending_statement_validations", + "Number of pending statement validations", + )?, + r, + )?, + ignored_statements: register( + Counter::new( + "substrate_sync_ignored_statements", + "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit", + )?, + r, + )?, }) } } @@ -131,7 +178,7 @@ impl StatementHandlerPrototype { let (config, notification_service) = Net::notification_config( protocol_name.clone().into(), Vec::new(), - MAX_STATEMENT_SIZE, + MAX_STATEMENT_NOTIFICATION_SIZE, None, SetConfig { in_peers: 0, @@ -162,7 +209,7 @@ impl StatementHandlerPrototype { executor: impl Fn(Pin + Send>>) + Send, ) -> error::Result> { let sync_event_stream = sync.event_stream("statement-handler-sync"); - let (queue_sender, mut queue_receiver) = async_channel::bounded(100_000); + let (queue_sender, mut queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS); let store = statement_store.clone(); executor( @@ -263,7 +310,10 @@ where loop { futures::select! { _ = self.propagate_timeout.next() => { - self.propagate_statements(); + self.propagate_statements().await; + self.metrics.as_ref().map(|metrics| { + metrics.pending_statements.set(self.pending_statements.len() as u64); + }); }, (hash, result) = self.pending_statements.select_next_some() => { if let Some(peers) = self.pending_statements_peers.remove(&hash) { @@ -373,19 +423,41 @@ where fn on_statements(&mut self, who: PeerId, statements: Statements) { log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who); if let Some(ref mut peer) = self.peers.get_mut(&who) { + let mut statements_left = statements.len() as u64; for s in statements { if self.pending_statements.len() > MAX_PENDING_STATEMENTS { log::debug!( target: LOG_TARGET, - "Ignoring any further statements that exceed `MAX_PENDING_STATEMENTS`({}) limit", + "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit", + statements_left, MAX_PENDING_STATEMENTS, ); + self.metrics.as_ref().map(|metrics| { + metrics.ignored_statements.inc_by(statements_left); + }); break } let hash = s.hash(); peer.known_statements.insert(hash); + if self.statement_store.has_statement(&hash) { + self.metrics.as_ref().map(|metrics| { + metrics.known_statements_received.inc(); + }); + + if let Some(peers) = self.pending_statements_peers.get(&hash) { + if peers.contains(&who) { + log::trace!( + target: LOG_TARGET, + "Already received the statement from the same peer {who}.", + ); + self.network.report_peer(who, rep::DUPLICATE_STATEMENT); + } + } + continue; + } + self.network.report_peer(who, rep::ANY_STATEMENT); match self.pending_statements_peers.entry(hash) { @@ -423,6 +495,8 @@ where } }, } + + statements_left -= 1; } } } @@ -442,7 +516,7 @@ where } /// Propagate one statement. - pub fn propagate_statement(&mut self, hash: &Hash) { + pub async fn propagate_statement(&mut self, hash: &Hash) { // Accept statements only when node is not major syncing if self.sync.is_major_syncing() { return @@ -450,16 +524,18 @@ where log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash); if let Ok(Some(statement)) = self.statement_store.statement(hash) { - self.do_propagate_statements(&[(*hash, statement)]); + self.do_propagate_statements(&[(*hash, statement)]).await; } } - fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) { - let mut propagated_statements = 0; - + async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) { + log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len()); for (who, peer) in self.peers.iter_mut() { + log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who); + // never send statements to light nodes - if matches!(peer.role, ObservedRole::Light) { + if peer.role.is_light() { + log::trace!(target: LOG_TARGET, "{} is a light node, skipping propagation", who); continue } @@ -467,30 +543,593 @@ where .iter() .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt)) .collect::>(); + log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len()); - propagated_statements += to_send.len(); + let mut offset = 0; + while offset < to_send.len() { + // Try to send as many statements as possible in one notification + let mut current_end = to_send.len(); + log::trace!(target: LOG_TARGET, "Looking for better chunk size"); - if !to_send.is_empty() { - log::trace!(target: LOG_TARGET, "Sending {} statements to {}", to_send.len(), who); - self.notification_service.send_sync_notification(who, to_send.encode()); - } - } + loop { + let chunk = &to_send[offset..current_end]; + let encoded_size = chunk.encoded_size(); + log::trace!(target: LOG_TARGET, "Chunk: {} statements, {} KB", chunk.len(), encoded_size / 1024); - if let Some(ref metrics) = self.metrics { - metrics.propagated_statements.inc_by(propagated_statements as _) + // If chunk fits, send it + if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { + if let Err(e) = timeout( + SEND_TIMEOUT, + self.notification_service.send_async_notification(who, chunk.encode()), + ) + .await + { + log::debug!(target: LOG_TARGET, "Failed to send notification to {}, peer disconnected, skipping further batches: {:?}", who, e); + offset = to_send.len(); + break; + } + offset = current_end; + log::trace!(target: LOG_TARGET, "Sent {} statements ({} KB) to {}, {} left", chunk.len(), encoded_size / 1024, who, to_send.len() - offset); + self.metrics.as_ref().map(|metrics| { + metrics.propagated_statements.inc_by(chunk.len() as u64); + metrics.propagated_statements_chunks.observe(chunk.len() as f64); + }); + break; + } + + // Size exceeded - split the chunk + let split_factor = + (encoded_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; + let mut new_chunk_size = (current_end - offset) / split_factor; + + // Single statement is too large + if new_chunk_size == 0 { + if chunk.len() == 1 { + log::warn!(target: LOG_TARGET, "Statement too large ({} KB), skipping", encoded_size / 1024); + self.metrics.as_ref().map(|metrics| { + metrics.skipped_oversized_statements.inc(); + }); + offset = current_end; + break; + } + // Don't skip more than one statement at once + new_chunk_size = 1; + } + + // Reduce chunk size and try again + current_end = offset + new_chunk_size; + } + } } + log::trace!(target: LOG_TARGET, "Statements propagated to all peers"); } /// Call when we must propagate ready statements to peers. - fn propagate_statements(&mut self) { + async fn propagate_statements(&mut self) { // Send out statements only when node is not major syncing if self.sync.is_major_syncing() { return } - log::debug!(target: LOG_TARGET, "Propagating statements"); - if let Ok(statements) = self.statement_store.statements() { - self.do_propagate_statements(&statements); + let Ok(statements) = self.statement_store.take_recent_statements() else { return }; + if !statements.is_empty() { + self.do_propagate_statements(&statements).await; + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use std::sync::Mutex; + + #[derive(Clone)] + struct TestNetwork { + reported_peers: Arc>>, + } + + impl TestNetwork { + fn new() -> Self { + Self { reported_peers: Arc::new(Mutex::new(Vec::new())) } + } + + fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> { + self.reported_peers.lock().unwrap().clone() + } + } + + #[async_trait::async_trait] + impl NetworkPeers for TestNetwork { + fn set_authorized_peers(&self, _: std::collections::HashSet) { + unimplemented!() + } + + fn set_authorized_only(&self, _: bool) { + unimplemented!() + } + + fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) { + unimplemented!() + } + + fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) { + self.reported_peers.lock().unwrap().push((peer_id, cost_benefit)); + } + + fn peer_reputation(&self, _: &PeerId) -> i32 { + unimplemented!() + } + + fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) { + unimplemented!() + } + + fn accept_unreserved_peers(&self) { + unimplemented!() + } + + fn deny_unreserved_peers(&self) { + unimplemented!() + } + + fn add_reserved_peer( + &self, + _: sc_network::config::MultiaddrWithPeerId, + ) -> Result<(), String> { + unimplemented!() + } + + fn remove_reserved_peer(&self, _: PeerId) { + unimplemented!() + } + + fn set_reserved_peers( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + unimplemented!() + } + + fn add_peers_to_reserved_set( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + unimplemented!() + } + + fn remove_peers_from_reserved_set( + &self, + _: sc_network::ProtocolName, + _: Vec, + ) -> Result<(), String> { + unimplemented!() + } + + fn sync_num_connected(&self) -> usize { + unimplemented!() + } + + fn peer_role(&self, _: PeerId, _: Vec) -> Option { + unimplemented!() + } + + async fn reserved_peers(&self) -> Result, ()> { + unimplemented!(); + } + } + + struct TestSync {} + + impl SyncEventStream for TestSync { + fn event_stream( + &self, + _name: &'static str, + ) -> Pin + Send>> { + unimplemented!() + } + } + + impl sp_consensus::SyncOracle for TestSync { + fn is_major_syncing(&self) -> bool { + false + } + + fn is_offline(&self) -> bool { + unimplemented!() + } + } + + impl NetworkEventStream for TestNetwork { + fn event_stream( + &self, + _name: &'static str, + ) -> Pin + Send>> { + unimplemented!() } } + + #[derive(Debug, Clone)] + struct TestNotificationService { + sent_notifications: Arc)>>>, + } + + impl TestNotificationService { + fn new() -> Self { + Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) } + } + + fn get_sent_notifications(&self) -> Vec<(PeerId, Vec)> { + self.sent_notifications.lock().unwrap().clone() + } + } + + #[async_trait::async_trait] + impl NotificationService for TestNotificationService { + async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> { + unimplemented!() + } + + async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> { + unimplemented!() + } + + fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec) { + self.sent_notifications.lock().unwrap().push((*peer, notification)); + } + + async fn send_async_notification( + &mut self, + peer: &PeerId, + notification: Vec, + ) -> Result<(), sc_network::error::Error> { + self.sent_notifications.lock().unwrap().push((*peer, notification)); + Ok(()) + } + + async fn set_handshake(&mut self, _handshake: Vec) -> Result<(), ()> { + unimplemented!() + } + + fn try_set_handshake(&mut self, _handshake: Vec) -> Result<(), ()> { + unimplemented!() + } + + async fn next_event(&mut self) -> Option { + None + } + + fn clone(&mut self) -> Result, ()> { + unimplemented!() + } + + fn protocol(&self) -> &sc_network::types::ProtocolName { + unimplemented!() + } + + fn message_sink( + &self, + _peer: &PeerId, + ) -> Option> { + unimplemented!() + } + } + + #[derive(Clone)] + struct TestStatementStore { + statements: Arc>>, + recent_statements: + Arc>>, + } + + impl TestStatementStore { + fn new() -> Self { + Self { statements: Default::default(), recent_statements: Default::default() } + } + } + + impl StatementStore for TestStatementStore { + fn statements( + &self, + ) -> sp_statement_store::Result< + Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, + > { + Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect()) + } + + fn take_recent_statements( + &self, + ) -> sp_statement_store::Result< + Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, + > { + Ok(self.recent_statements.lock().unwrap().drain().collect()) + } + + fn statement( + &self, + _hash: &sp_statement_store::Hash, + ) -> sp_statement_store::Result> { + unimplemented!() + } + + fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool { + self.statements.lock().unwrap().contains_key(hash) + } + + fn broadcasts( + &self, + _match_all_topics: &[sp_statement_store::Topic], + ) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted_clear( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn broadcasts_stmt( + &self, + _match_all_topics: &[sp_statement_store::Topic], + ) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted_stmt( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted_clear_stmt( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn submit( + &self, + _statement: sp_statement_store::Statement, + _source: sp_statement_store::StatementSource, + ) -> sp_statement_store::SubmitResult { + unimplemented!() + } + + fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> { + unimplemented!() + } + + fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> { + unimplemented!() + } + } + + fn build_handler() -> ( + StatementHandler, + TestStatementStore, + TestNetwork, + TestNotificationService, + async_channel::Receiver<(Statement, oneshot::Sender)>, + ) { + let statement_store = TestStatementStore::new(); + let (queue_sender, queue_receiver) = async_channel::bounded(2); + let network = TestNetwork::new(); + let notification_service = TestNotificationService::new(); + let peer_id = PeerId::random(); + let mut peers = HashMap::new(); + peers.insert( + peer_id, + Peer { + known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()), + role: ObservedRole::Full, + }, + ); + + let handler = StatementHandler { + protocol_name: "/statement/1".into(), + notification_service: Box::new(notification_service.clone()), + propagate_timeout: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + pending_statements: FuturesUnordered::new(), + pending_statements_peers: HashMap::new(), + network: network.clone(), + sync: TestSync {}, + sync_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers, + statement_store: Arc::new(statement_store.clone()), + queue_sender, + metrics: None, + }; + (handler, statement_store, network, notification_service, queue_receiver) + } + + #[test] + fn test_skips_processing_statements_that_already_in_store() { + let (mut handler, statement_store, _network, _notification_service, queue_receiver) = + build_handler(); + + let mut statement1 = Statement::new(); + statement1.set_plain_data(b"statement1".to_vec()); + let hash1 = statement1.hash(); + + statement_store.statements.lock().unwrap().insert(hash1, statement1.clone()); + + let mut statement2 = Statement::new(); + statement2.set_plain_data(b"statement2".to_vec()); + let hash2 = statement2.hash(); + + let peer_id = *handler.peers.keys().next().unwrap(); + + handler.on_statements(peer_id, vec![statement1, statement2]); + + let to_submit = queue_receiver.try_recv(); + assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued"); + + let no_more = queue_receiver.try_recv(); + assert!(no_more.is_err(), "Expected only one statement to be queued"); + } + + #[test] + fn test_reports_for_duplicate_statements() { + let (mut handler, statement_store, network, _notification_service, queue_receiver) = + build_handler(); + + let peer_id = *handler.peers.keys().next().unwrap(); + + let mut statement1 = Statement::new(); + statement1.set_plain_data(b"statement1".to_vec()); + + handler.on_statements(peer_id, vec![statement1.clone()]); + { + // Manually process statements submission + let (s, _) = queue_receiver.try_recv().unwrap(); + let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s); + handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND); + } + + handler.on_statements(peer_id, vec![statement1]); + + let reports = network.get_reports(); + assert_eq!( + reports, + vec![ + (peer_id, rep::ANY_STATEMENT), // Report for first statement + (peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement + (peer_id, rep::DUPLICATE_STATEMENT) // Report for duplicate statement + ], + "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}", + reports + ); + } + + #[tokio::test] + async fn test_splits_large_batches_into_smaller_chunks() { + let (mut handler, statement_store, _network, notification_service, _queue_receiver) = + build_handler(); + + let num_statements = 30; + let statement_size = 100 * 1024; // 100KB per statement + for i in 0..num_statements { + let mut statement = Statement::new(); + let mut data = vec![0u8; statement_size]; + data[0] = i as u8; + statement.set_plain_data(data); + let hash = statement.hash(); + statement_store.recent_statements.lock().unwrap().insert(hash, statement); + } + + handler.propagate_statements().await; + + let sent = notification_service.get_sent_notifications(); + let mut total_statements_sent = 0; + assert!( + sent.len() == 3, + "Expected batch to be split into 3 chunks, but got {} chunks", + sent.len() + ); + for (_peer, notification) in sent.iter() { + assert!( + notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize, + "Notification size {} exceeds limit {}", + notification.len(), + MAX_STATEMENT_NOTIFICATION_SIZE + ); + if let Ok(stmts) = ::decode(&mut notification.as_slice()) { + total_statements_sent += stmts.len(); + } + } + + assert_eq!( + total_statements_sent, num_statements, + "Expected all {} statements to be sent, but only {} were sent", + num_statements, total_statements_sent + ); + } + + #[tokio::test] + async fn test_skips_only_oversized_statements() { + let (mut handler, statement_store, _network, notification_service, _queue_receiver) = + build_handler(); + + let mut statement1 = Statement::new(); + statement1.set_plain_data(vec![1u8; 100]); + let hash1 = statement1.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash1, statement1.clone()); + + let mut oversized1 = Statement::new(); + oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]); + let hash_oversized1 = oversized1.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash_oversized1, oversized1); + + let mut statement2 = Statement::new(); + statement2.set_plain_data(vec![3u8; 100]); + let hash2 = statement2.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash2, statement2.clone()); + + let mut oversized2 = Statement::new(); + oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]); + let hash_oversized2 = oversized2.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash_oversized2, oversized2); + + let mut statement3 = Statement::new(); + statement3.set_plain_data(vec![5u8; 100]); + let hash3 = statement3.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash3, statement3.clone()); + + handler.propagate_statements().await; + + let sent = notification_service.get_sent_notifications(); + + let mut sent_hashes = sent + .iter() + .flat_map(|(_peer, notification)| { + ::decode(&mut notification.as_slice()).unwrap() + }) + .map(|s| s.hash()) + .collect::>(); + sent_hashes.sort(); + let mut expected_hashes = vec![hash1, hash2, hash3]; + expected_hashes.sort(); + assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent"); + } } diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index 0b5011e7bda53..b9091673b7fca 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -151,6 +151,7 @@ impl Default for Options { #[derive(Default)] struct Index { + recent: HashSet, by_topic: HashMap>, by_dec_key: HashMap, HashSet>, topics_and_keys: HashMap; MAX_TOPICS], Option)>, @@ -239,6 +240,7 @@ impl Index { } let priority = Priority(statement.priority().unwrap_or(0)); self.entries.insert(hash, (account, priority, statement.data_len())); + self.recent.insert(hash); self.total_size += statement.data_len(); let account_info = self.accounts.entry(account).or_default(); account_info.data_size += statement.data_len(); @@ -320,6 +322,10 @@ impl Index { purged } + fn take_recent(&mut self) -> HashSet { + std::mem::take(&mut self.recent) + } + fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool { if let Some((account, priority, len)) = self.entries.remove(hash) { self.total_size -= len; @@ -343,6 +349,7 @@ impl Index { } } } + let _ = self.recent.remove(hash); self.expired.insert(*hash, current_time); if let std::collections::hash_map::Entry::Occupied(mut account_rec) = self.accounts.entry(account) @@ -764,6 +771,7 @@ impl StatementStore for Store { fn statements(&self) -> Result> { let index = self.index.read(); let mut result = Vec::with_capacity(index.entries.len()); +<<<<<<< HEAD for h in self.index.read().entries.keys() { let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?; if let Some(encoded) = encoded { @@ -771,6 +779,33 @@ impl StatementStore for Store { let hash = statement.hash(); result.push((hash, statement)); } +======= + for hash in index.entries.keys().cloned() { + let Some(encoded) = + self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))? + else { + continue + }; + if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { + result.push((hash, statement)); + } + } + Ok(result) + } + + fn take_recent_statements(&self) -> Result> { + let mut index = self.index.write(); + let recent = index.take_recent(); + let mut result = Vec::with_capacity(recent.len()); + for hash in recent { + let Some(encoded) = + self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))? + else { + continue + }; + if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { + result.push((hash, statement)); +>>>>>>> b21cbb58 (Improve statement-store gossiping performance (#9912)) } } Ok(result) @@ -807,6 +842,10 @@ impl StatementStore for Store { ) } + fn has_statement(&self, hash: &Hash) -> bool { + self.index.read().entries.contains_key(hash) + } + /// Return the data of all known statements which include all topics and have no `DecryptionKey` /// field. fn broadcasts(&self, match_all_topics: &[Topic]) -> Result>> { @@ -1184,6 +1223,40 @@ mod tests { assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1)); } + #[test] + fn take_recent_statements_clears_index() { + let (store, _temp) = test_store(); + let statement0 = signed_statement(0); + let statement1 = signed_statement(1); + let statement2 = signed_statement(2); + let statement3 = signed_statement(3); + + let _ = store.submit(statement0.clone(), StatementSource::Local); + let _ = store.submit(statement1.clone(), StatementSource::Local); + let _ = store.submit(statement2.clone(), StatementSource::Local); + + let recent1 = store.take_recent_statements().unwrap(); + let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip(); + let expected1 = vec![statement0, statement1, statement2]; + assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash()))); + assert!(expected1.iter().all(|s| recent1_statements.contains(s))); + + // Recent statements are cleared. + let recent2 = store.take_recent_statements().unwrap(); + assert_eq!(recent2.len(), 0); + + store.submit(statement3.clone(), StatementSource::Network); + + let recent3 = store.take_recent_statements().unwrap(); + let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip(); + let expected3 = vec![statement3]; + assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash()))); + assert!(expected3.iter().all(|s| recent3_statements.contains(s))); + + // Recent statements are cleared, but statements remain in the store. + assert_eq!(store.statements().unwrap().len(), 4); + } + #[test] fn search_by_topic_and_key() { let (store, _temp) = test_store(); diff --git a/substrate/primitives/statement-store/src/store_api.rs b/substrate/primitives/statement-store/src/store_api.rs index eb168d6d151e6..582145585afcc 100644 --- a/substrate/primitives/statement-store/src/store_api.rs +++ b/substrate/primitives/statement-store/src/store_api.rs @@ -66,9 +66,20 @@ pub trait StatementStore: Send + Sync { /// Return all statements. fn statements(&self) -> Result>; + /// Return recent statements and clear the internal index. + /// + /// This consumes and clears the recently received statements, + /// allowing new statements to be collected from this point forward. + fn take_recent_statements(&self) -> Result>; + /// Get statement by hash. fn statement(&self, hash: &Hash) -> Result>; + /// Check if statement exists in the store + /// + /// Fast index check without accessing the DB. + fn has_statement(&self, hash: &Hash) -> bool; + /// Return the data of all known statements which include all topics and have no `DecryptionKey` /// field. fn broadcasts(&self, match_all_topics: &[Topic]) -> Result>>;