From f06dab80c7340482f4a0fa2ac13805cf40cb4d07 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 2 Oct 2025 11:45:57 +0200 Subject: [PATCH 01/42] Gossip only recent statements --- substrate/client/network/statement/src/lib.rs | 2 +- substrate/client/statement-store/src/lib.rs | 57 +++++++++++++++++++ .../statement-store/src/store_api.rs | 6 ++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index df93788696e38..f947ec6f7b176 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -489,7 +489,7 @@ where } log::debug!(target: LOG_TARGET, "Propagating statements"); - if let Ok(statements) = self.statement_store.statements() { + if let Ok(statements) = self.statement_store.take_recent_statements() { self.do_propagate_statements(&statements); } } diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index d857cbc51c8ad..eb9f0cf4ff05d 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -155,6 +155,7 @@ impl Default for Options { #[derive(Default)] struct Index { + recent: HashSet, by_topic: HashMap>, by_dec_key: HashMap, HashSet>, topics_and_keys: HashMap; MAX_TOPICS], Option)>, @@ -243,6 +244,7 @@ impl Index { } let priority = Priority(statement.priority().unwrap_or(0)); self.entries.insert(hash, (account, priority, statement.data_len())); + self.recent.insert(hash); self.total_size += statement.data_len(); let account_info = self.accounts.entry(account).or_default(); account_info.data_size += statement.data_len(); @@ -324,6 +326,10 @@ impl Index { purged } + fn take_recent(&mut self) -> HashSet { + std::mem::take(&mut self.recent) + } + fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool { if let Some((account, priority, len)) = self.entries.remove(hash) { self.total_size -= len; @@ -347,6 +353,7 @@ impl Index { } } } + let _ = self.recent.remove(hash); self.expired.insert(*hash, current_time); if let std::collections::hash_map::Entry::Occupied(mut account_rec) = self.accounts.entry(account) @@ -779,6 +786,22 @@ impl StatementStore for Store { Ok(result) } + fn take_recent_statements(&self) -> Result> { + let mut index = self.index.write(); + let recent = index.take_recent(); + let mut result = Vec::with_capacity(recent.len()); + for h in recent { + let encoded = self.db.get(col::STATEMENTS, &h).map_err(|e| Error::Db(e.to_string()))?; + if let Some(encoded) = encoded { + if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { + let hash = statement.hash(); + result.push((hash, statement)); + } + } + } + Ok(result) + } + /// Returns a statement by hash. fn statement(&self, hash: &Hash) -> Result> { Ok( @@ -1214,6 +1237,40 @@ mod tests { assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1)); } + #[test] + fn take_recent_statements_clears_index() { + let (store, _temp) = test_store(); + let statement0 = signed_statement(0); + let statement1 = signed_statement(1); + let statement2 = signed_statement(2); + let statement3 = signed_statement(3); + + let _ = store.submit(statement0.clone(), StatementSource::Local); + let _ = store.submit(statement1.clone(), StatementSource::Local); + let _ = store.submit(statement2.clone(), StatementSource::Local); + + let recent1 = store.take_recent_statements().unwrap(); + let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip(); + let expected1 = vec![statement0, statement1, statement2]; + assert_eq!(recent1_hashes, expected1.iter().map(|s| s.hash()).collect::>()); + assert_eq!(recent1_statements, expected1); + + // Recent statements are cleared. + let recent2 = store.take_recent_statements().unwrap(); + assert_eq!(recent2.len(), 0); + + store.submit(statement3.clone(), StatementSource::Network); + + let recent3 = store.take_recent_statements().unwrap(); + let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip(); + let expected3 = vec![statement3]; + assert_eq!(recent3_hashes, expected3.iter().map(|s| s.hash()).collect::>()); + assert_eq!(recent3_statements, expected3); + + // Recent statements are cleared, but statements remain in the store. + assert_eq!(store.statements().unwrap().len(), 4); + } + #[test] fn search_by_topic_and_key() { let (store, _temp) = test_store(); diff --git a/substrate/primitives/statement-store/src/store_api.rs b/substrate/primitives/statement-store/src/store_api.rs index 8edb527c54c88..34d253b96d34c 100644 --- a/substrate/primitives/statement-store/src/store_api.rs +++ b/substrate/primitives/statement-store/src/store_api.rs @@ -66,6 +66,12 @@ pub trait StatementStore: Send + Sync { /// Return all statements. fn statements(&self) -> Result>; + /// Return recent statements and clear the internal index. + /// + /// This consumes and clears the recently received statements, + /// allowing new statements to be collected from this point forward. + fn take_recent_statements(&self) -> Result>; + /// Get statement by hash. fn statement(&self, hash: &Hash) -> Result>; From eca57b40bc38552c0c9466df2764fc3dd89abb55 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 2 Oct 2025 13:08:05 +0200 Subject: [PATCH 02/42] Check if the store already has a statement before processing --- substrate/client/network/statement/src/lib.rs | 15 +++++++++++++++ substrate/client/statement-store/src/lib.rs | 4 ++++ .../primitives/statement-store/src/store_api.rs | 5 +++++ 3 files changed, 24 insertions(+) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index f947ec6f7b176..131b689620ca5 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -88,6 +88,7 @@ const LOG_TARGET: &str = "statement-gossip"; struct Metrics { propagated_statements: Counter, + known_statements_received: Counter, } impl Metrics { @@ -100,6 +101,13 @@ impl Metrics { )?, r, )?, + known_statements_received: register( + Counter::new( + "substrate_sync_known_statement_received", + "Number of statements received via gossiping that were already in the statement store", + )?, + r, + )?, }) } } @@ -388,6 +396,13 @@ where self.network.report_peer(who, rep::ANY_STATEMENT); + if self.statement_store.has_statement(&hash) { + if let Some(ref metrics) = self.metrics { + metrics.known_statements_received.inc(); + } + continue; + } + match self.pending_statements_peers.entry(hash) { Entry::Vacant(entry) => { let (completion_sender, completion_receiver) = oneshot::channel(); diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index eb9f0cf4ff05d..540a80f25641d 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -833,6 +833,10 @@ impl StatementStore for Store { ) } + fn has_statement(&self, hash: &Hash) -> bool { + self.index.read().entries.contains_key(hash) + } + /// Return the data of all known statements which include all topics and have no `DecryptionKey` /// field. fn broadcasts(&self, match_all_topics: &[Topic]) -> Result>> { diff --git a/substrate/primitives/statement-store/src/store_api.rs b/substrate/primitives/statement-store/src/store_api.rs index 34d253b96d34c..f799a39c00246 100644 --- a/substrate/primitives/statement-store/src/store_api.rs +++ b/substrate/primitives/statement-store/src/store_api.rs @@ -75,6 +75,11 @@ pub trait StatementStore: Send + Sync { /// Get statement by hash. fn statement(&self, hash: &Hash) -> Result>; + /// Check if statement exists in the store + /// + /// Fast index check without accessing the DB. + fn has_statement(&self, hash: &Hash) -> bool; + /// Return the data of all known statements which include all topics and have no `DecryptionKey` /// field. fn broadcasts(&self, match_all_topics: &[Topic]) -> Result>>; From 7b8aeb1b414f2488e467bb130ae978d1d396dc8e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 2 Oct 2025 12:25:07 +0200 Subject: [PATCH 03/42] Reduce notification size if too large --- .../client/network/statement/src/config.rs | 2 +- substrate/client/network/statement/src/lib.rs | 77 +++++++++++++++++-- 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/substrate/client/network/statement/src/config.rs b/substrate/client/network/statement/src/config.rs index f4546b027c17e..42553b0c86195 100644 --- a/substrate/client/network/statement/src/config.rs +++ b/substrate/client/network/statement/src/config.rs @@ -27,7 +27,7 @@ pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; /// Maximum allowed size for a statement notification. -pub(crate) const MAX_STATEMENT_SIZE: u64 = 256 * 1024; +pub(crate) const MAX_STATEMENT_NOTIFICATION_SIZE: u64 = 256 * 1024; /// Maximum number of statement validation request we keep at any moment. pub(crate) const MAX_PENDING_STATEMENTS: usize = 2 * 1024 * 1024; diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 131b689620ca5..a9c5c2b594318 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -30,7 +30,9 @@ use crate::config::*; use codec::{Decode, Encode}; use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt}; -use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; +use prometheus_endpoint::{ + register, Counter, Histogram, HistogramOpts, PrometheusError, Registry, U64, +}; use sc_network::{ config::{NonReservedPeerMode, SetConfig}, error, multiaddr, @@ -89,6 +91,8 @@ const LOG_TARGET: &str = "statement-gossip"; struct Metrics { propagated_statements: Counter, known_statements_received: Counter, + skipped_oversized_statements: Counter, + propagated_statements_chunks: Histogram, } impl Metrics { @@ -108,6 +112,22 @@ impl Metrics { )?, r, )?, + skipped_oversized_statements: register( + Counter::new( + "substrate_sync_skipped_oversized_statements", + "Number of oversized statements that were skipped to be gossiped", + )?, + r, + )?, + propagated_statements_chunks: register( + Histogram::with_opts( + HistogramOpts::new( + "substrate_sync_propagated_statements_chunks", + "Distribution of chunk sizes when propagating statements", + ), + )?, + r, + )?, }) } } @@ -139,7 +159,7 @@ impl StatementHandlerPrototype { let (config, notification_service) = Net::notification_config( protocol_name.clone().into(), Vec::new(), - MAX_STATEMENT_SIZE, + MAX_STATEMENT_NOTIFICATION_SIZE, None, SetConfig { in_peers: 0, @@ -485,9 +505,56 @@ where propagated_statements += to_send.len(); - if !to_send.is_empty() { - log::trace!(target: LOG_TARGET, "Sending {} statements to {}", to_send.len(), who); - self.notification_service.send_sync_notification(who, to_send.encode()); + let mut offset = 0; + while offset < to_send.len() { + // Try to send as many statements as possible in one notification + let chunk_size = to_send.len() - offset; + let chunk_end = offset + chunk_size; + let mut current_end = chunk_end; + + loop { + let chunk = &to_send[offset..current_end]; + let encoded = chunk.encode(); + + // If chunk fits, send it + if encoded.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { + log::trace!( + target: LOG_TARGET, + "Sending {} statements ({} KB) to {}", + chunk.len(), + encoded.len() / 1024, + who + ); + self.notification_service.send_sync_notification(who, encoded); + offset = current_end; + if let Some(ref metrics) = self.metrics { + metrics.propagated_statements_chunks.observe(chunk.len() as f64); + } + break; + } + + // Size exceeded - split the chunk + let split_factor = + (encoded.len() / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; + let new_chunk_size = (current_end - offset) / split_factor; + + // Single statement is too large + if new_chunk_size == 0 { + log::warn!( + target: LOG_TARGET, + "Statement too large ({} KB), skipping", + encoded.len() / 1024 + ); + if let Some(ref metrics) = self.metrics { + metrics.skipped_oversized_statements.inc(); + } + offset = current_end; + break; + } + + // Reduce chunk size and try again + current_end = offset + new_chunk_size; + } } } From 3e6999976167cdde37309e29ca556d0495fb1fe2 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 2 Oct 2025 17:21:22 +0200 Subject: [PATCH 04/42] Update metrics --- substrate/client/network/statement/src/config.rs | 2 +- substrate/client/network/statement/src/lib.rs | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/statement/src/config.rs b/substrate/client/network/statement/src/config.rs index 42553b0c86195..78480a2306172 100644 --- a/substrate/client/network/statement/src/config.rs +++ b/substrate/client/network/statement/src/config.rs @@ -24,7 +24,7 @@ use std::time; pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(1000); /// Maximum number of known statement hashes to keep for a peer. -pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; +pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; // * 32 bytes for hash = 128 MB per peer /// Maximum allowed size for a statement notification. pub(crate) const MAX_STATEMENT_NOTIFICATION_SIZE: u64 = 256 * 1024; diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index a9c5c2b594318..b7bc5d381d378 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -31,7 +31,7 @@ use crate::config::*; use codec::{Decode, Encode}; use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt}; use prometheus_endpoint::{ - register, Counter, Histogram, HistogramOpts, PrometheusError, Registry, U64, + prometheus, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64, }; use sc_network::{ config::{NonReservedPeerMode, SetConfig}, @@ -93,6 +93,7 @@ struct Metrics { known_statements_received: Counter, skipped_oversized_statements: Counter, propagated_statements_chunks: Histogram, + pending_statements: Gauge, } impl Metrics { @@ -124,7 +125,14 @@ impl Metrics { HistogramOpts::new( "substrate_sync_propagated_statements_chunks", "Distribution of chunk sizes when propagating statements", - ), + ).buckets(prometheus::exponential_buckets(1.0, 2.0, 14)?), + )?, + r, + )?, + pending_statements: register( + Gauge::new( + "substrate_sync_pending_statement_validations", + "Number of pending statement validations", )?, r, )?, @@ -292,6 +300,9 @@ where futures::select! { _ = self.propagate_timeout.next() => { self.propagate_statements(); + if let Some(ref metrics) = self.metrics { + metrics.pending_statements.set(self.pending_statements.len() as u64); + } }, (hash, result) = self.pending_statements.select_next_some() => { if let Some(peers) = self.pending_statements_peers.remove(&hash) { From 9a5deccdc58c09ff28a173b7085486f534702371 Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 2 Oct 2025 16:42:25 +0000 Subject: [PATCH 05/42] Update from github-actions[bot] running command 'prdoc --audience node_dev --bump patch' --- prdoc/pr_9912.prdoc | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 prdoc/pr_9912.prdoc diff --git a/prdoc/pr_9912.prdoc b/prdoc/pr_9912.prdoc new file mode 100644 index 0000000000000..dd545a6a59d8f --- /dev/null +++ b/prdoc/pr_9912.prdoc @@ -0,0 +1,22 @@ +title: '[WIP] Fix statement-store gossiping' +doc: +- audience: Node Dev + description: |- + # Description + + Fixes gossiping and scalability issues in the statement-store networking. + + 1. Reduced gossiping traffic by propagating only recent statements instead of all. + 2. Added early check for statements that the node already has to skip duplicate processing. + 3. Added splitting of large statement batches to stay under MAX_STATEMENT_NOTIFICATION_SIZE; oversized individual statements are skipped. + + ## Integration + + Internal optimizations to the gossip protocol. No downstream changes required. +crates: +- name: sc-network-statement + bump: patch +- name: sc-statement-store + bump: patch +- name: sp-statement-store + bump: patch From baeba9f690902ff89075eb17afbe22c50bd4e969 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 2 Oct 2025 18:44:36 +0200 Subject: [PATCH 06/42] Update PRDOC --- prdoc/pr_9912.prdoc | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/prdoc/pr_9912.prdoc b/prdoc/pr_9912.prdoc index dd545a6a59d8f..69f3312d65d6f 100644 --- a/prdoc/pr_9912.prdoc +++ b/prdoc/pr_9912.prdoc @@ -2,17 +2,9 @@ title: '[WIP] Fix statement-store gossiping' doc: - audience: Node Dev description: |- - # Description - - Fixes gossiping and scalability issues in the statement-store networking. - - 1. Reduced gossiping traffic by propagating only recent statements instead of all. - 2. Added early check for statements that the node already has to skip duplicate processing. - 3. Added splitting of large statement batches to stay under MAX_STATEMENT_NOTIFICATION_SIZE; oversized individual statements are skipped. - - ## Integration - - Internal optimizations to the gossip protocol. No downstream changes required. + Fixes gossiping and scalability issues in the statement-store networking: + reduces traffic by propagating only recent statements, skips duplicate processing, + and splits large batches to stay under MAX_STATEMENT_NOTIFICATION_SIZE. crates: - name: sc-network-statement bump: patch From b0607bc5268248b75660ab7280f00a42b31cd9eb Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 2 Oct 2025 18:46:37 +0200 Subject: [PATCH 07/42] Update semver --- prdoc/pr_9912.prdoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prdoc/pr_9912.prdoc b/prdoc/pr_9912.prdoc index 69f3312d65d6f..5e9cdd1a02e57 100644 --- a/prdoc/pr_9912.prdoc +++ b/prdoc/pr_9912.prdoc @@ -7,8 +7,8 @@ doc: and splits large batches to stay under MAX_STATEMENT_NOTIFICATION_SIZE. crates: - name: sc-network-statement - bump: patch + bump: minor - name: sc-statement-store - bump: patch + bump: minor - name: sp-statement-store - bump: patch + bump: minor From 52fb0aae5a66efecdc5ab50745dde49394bcc662 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 3 Oct 2025 10:02:21 +0200 Subject: [PATCH 08/42] Address review comments --- substrate/client/network/statement/src/lib.rs | 8 +++----- substrate/client/statement-store/src/lib.rs | 12 ++++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index b7bc5d381d378..fbcaed5e204f0 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -520,15 +520,13 @@ where while offset < to_send.len() { // Try to send as many statements as possible in one notification let chunk_size = to_send.len() - offset; - let chunk_end = offset + chunk_size; - let mut current_end = chunk_end; + let mut current_end = to_send.len(); loop { let chunk = &to_send[offset..current_end]; - let encoded = chunk.encode(); // If chunk fits, send it - if encoded.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { + if chunk.encoded_size() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { log::trace!( target: LOG_TARGET, "Sending {} statements ({} KB) to {}", @@ -536,7 +534,7 @@ where encoded.len() / 1024, who ); - self.notification_service.send_sync_notification(who, encoded); + self.notification_service.send_sync_notification(who, chunk.encode()); offset = current_end; if let Some(ref metrics) = self.metrics { metrics.propagated_statements_chunks.observe(chunk.len() as f64); diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index 540a80f25641d..fa7d9f2acf8bc 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -774,11 +774,11 @@ impl StatementStore for Store { fn statements(&self) -> Result> { let index = self.index.read(); let mut result = Vec::with_capacity(index.entries.len()); - for h in index.entries.keys() { - let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?; + for hash in index.entries.keys() { + let encoded = + self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?; if let Some(encoded) = encoded { if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { - let hash = statement.hash(); result.push((hash, statement)); } } @@ -790,11 +790,11 @@ impl StatementStore for Store { let mut index = self.index.write(); let recent = index.take_recent(); let mut result = Vec::with_capacity(recent.len()); - for h in recent { - let encoded = self.db.get(col::STATEMENTS, &h).map_err(|e| Error::Db(e.to_string()))?; + for hash in recent { + let encoded = + self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?; if let Some(encoded) = encoded { if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { - let hash = statement.hash(); result.push((hash, statement)); } } From f08c2a85bcdffe7510e338a6f5d8a974cc0cbd74 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 3 Oct 2025 10:16:32 +0200 Subject: [PATCH 09/42] Fix types --- substrate/client/statement-store/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index fa7d9f2acf8bc..5ad7a23ba9c8a 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -774,9 +774,9 @@ impl StatementStore for Store { fn statements(&self) -> Result> { let index = self.index.read(); let mut result = Vec::with_capacity(index.entries.len()); - for hash in index.entries.keys() { + for hash in index.entries.keys().cloned() { let encoded = - self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?; + self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?; if let Some(encoded) = encoded { if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { result.push((hash, statement)); From 1e3e83be8deae2e01559329c48fc9ed24c85a1a8 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 3 Oct 2025 10:23:52 +0200 Subject: [PATCH 10/42] Fix up --- substrate/client/network/statement/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index fbcaed5e204f0..94c29b5b0ef24 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -524,14 +524,15 @@ where loop { let chunk = &to_send[offset..current_end]; + let encoded_size = chunk.encoded_size(); // If chunk fits, send it - if chunk.encoded_size() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { + if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { log::trace!( target: LOG_TARGET, "Sending {} statements ({} KB) to {}", chunk.len(), - encoded.len() / 1024, + encoded_size / 1024, who ); self.notification_service.send_sync_notification(who, chunk.encode()); From 8cdb31a5089515053c0d0ccfc6f6498a53133040 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Mon, 6 Oct 2025 09:27:39 +0200 Subject: [PATCH 11/42] Fix up --- substrate/client/network/statement/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 94c29b5b0ef24..fbd068e5bb958 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -545,7 +545,7 @@ where // Size exceeded - split the chunk let split_factor = - (encoded.len() / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; + (encode_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; let new_chunk_size = (current_end - offset) / split_factor; // Single statement is too large @@ -553,7 +553,7 @@ where log::warn!( target: LOG_TARGET, "Statement too large ({} KB), skipping", - encoded.len() / 1024 + encoded_size / 1024 ); if let Some(ref metrics) = self.metrics { metrics.skipped_oversized_statements.inc(); From a317299c303a57450ab52bfc6bc6a6a57641f060 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 10:40:44 +0200 Subject: [PATCH 12/42] Fix typo --- substrate/client/network/statement/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index fbd068e5bb958..05e1bfd0839b9 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -545,7 +545,7 @@ where // Size exceeded - split the chunk let split_factor = - (encode_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; + (encoded_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; let new_chunk_size = (current_end - offset) / split_factor; // Single statement is too large From 50ecef48b0d2a14eba7f03c29cf63f4ef78d1881 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 11:01:05 +0200 Subject: [PATCH 13/42] Remove unused var --- substrate/client/network/statement/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 05e1bfd0839b9..2964dc2cfb7f1 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -519,7 +519,6 @@ where let mut offset = 0; while offset < to_send.len() { // Try to send as many statements as possible in one notification - let chunk_size = to_send.len() - offset; let mut current_end = to_send.len(); loop { From 6b89e1c9d9cec28e4d56f040ad35649da10c3cdb Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 11:01:31 +0200 Subject: [PATCH 14/42] Update validation queue size --- substrate/client/network/statement/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 2964dc2cfb7f1..6788d8242be62 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -198,7 +198,7 @@ impl StatementHandlerPrototype { executor: impl Fn(Pin + Send>>) + Send, ) -> error::Result> { let sync_event_stream = sync.event_stream("statement-handler-sync"); - let (queue_sender, mut queue_receiver) = async_channel::bounded(100_000); + let (queue_sender, mut queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS); let store = statement_store.clone(); executor( From a705c00521c6b5ef837802d8d67388d69bb39c26 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 12:09:20 +0200 Subject: [PATCH 15/42] Add mocked handler for tests --- Cargo.lock | 1 + substrate/client/network/statement/Cargo.toml | 3 + substrate/client/network/statement/src/lib.rs | 243 ++++++++++++++++++ 3 files changed, 247 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f400ab177d36e..c9dfb017e9324 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20467,6 +20467,7 @@ version = "0.16.0" dependencies = [ "array-bytes 6.2.2", "async-channel 1.9.0", + "async-trait", "futures", "log", "parity-scale-codec", diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index dd3a8bef8a2f6..41c4e3cd83162 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -29,3 +29,6 @@ sc-network-types = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } sp-statement-store = { workspace = true, default-features = true } + +[dev-dependencies] +async-trait = { workspace = true } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 6788d8242be62..a6f50f29b2bbb 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -585,3 +585,246 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + + struct TestNetwork {} + + #[async_trait::async_trait] + impl NetworkPeers for TestNetwork { + fn set_authorized_peers(&self, _: std::collections::HashSet) { + unimplemented!() + } + + fn set_authorized_only(&self, _: bool) { + unimplemented!() + } + + fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) { + unimplemented!() + } + + fn report_peer(&self, _peer_id: PeerId, _cost_benefit: sc_network::ReputationChange) { + unimplemented!() + } + + fn peer_reputation(&self, _: &PeerId) -> i32 { + unimplemented!() + } + + fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) { + unimplemented!() + } + + fn accept_unreserved_peers(&self) { + unimplemented!() + } + + fn deny_unreserved_peers(&self) { + unimplemented!() + } + + fn add_reserved_peer( + &self, + _: sc_network::config::MultiaddrWithPeerId, + ) -> Result<(), String> { + unimplemented!() + } + + fn remove_reserved_peer(&self, _: PeerId) { + unimplemented!() + } + + fn set_reserved_peers( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + unimplemented!() + } + + fn add_peers_to_reserved_set( + &self, + _: sc_network::ProtocolName, + _: std::collections::HashSet, + ) -> Result<(), String> { + unimplemented!() + } + + fn remove_peers_from_reserved_set( + &self, + _: sc_network::ProtocolName, + _: Vec, + ) -> Result<(), String> { + unimplemented!() + } + + fn sync_num_connected(&self) -> usize { + unimplemented!() + } + + fn peer_role(&self, _: PeerId, _: Vec) -> Option { + unimplemented!() + } + + async fn reserved_peers(&self) -> Result, ()> { + unimplemented!(); + } + } + + struct TestSync {} + + impl SyncEventStream for TestSync { + fn event_stream(&self, _name: &'static str) -> Pin + Send>> { + unimplemented!() + } + } + + impl sp_consensus::SyncOracle for TestSync { + fn is_major_syncing(&self) -> bool { + unimplemented!() + } + + fn is_offline(&self) -> bool { + unimplemented!() + } + } + + + impl NetworkEventStream for TestNetwork { + fn event_stream(&self, _name: &'static str) -> Pin + Send>> { + unimplemented!() + } + } + + #[derive(Debug)] + struct TestNotificationService { + } + + #[async_trait::async_trait] + impl NotificationService for TestNotificationService { + async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> { + unimplemented!() + } + + async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> { + unimplemented!() + } + + fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec) { + unimplemented!() + } + + async fn send_async_notification( + &mut self, + _peer: &PeerId, + _notification: Vec, + ) -> Result<(), sc_network::error::Error> { + unimplemented!() + } + + async fn set_handshake(&mut self, _handshake: Vec) -> Result<(), ()> { + unimplemented!() + } + + fn try_set_handshake(&mut self, _handshake: Vec) -> Result<(), ()> { + unimplemented!() + } + + async fn next_event(&mut self) -> Option { + unimplemented!() + } + + fn clone(&mut self) -> Result, ()> { + unimplemented!() + } + + fn protocol(&self) -> &sc_network::types::ProtocolName { + unimplemented!() + } + + fn message_sink(&self, _peer: &PeerId) -> Option> { + unimplemented!() + } + } + + struct TestStatementStore {} + + impl StatementStore for TestStatementStore { + fn statements(&self) -> sp_statement_store::Result> { + unimplemented!() + } + + fn take_recent_statements(&self) -> sp_statement_store::Result> { + unimplemented!() + } + + fn statement(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result> { + unimplemented!() + } + + fn has_statement(&self, _hash: &sp_statement_store::Hash) -> bool { + unimplemented!() + } + + fn broadcasts(&self, _match_all_topics: &[sp_statement_store::Topic]) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted_clear(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn broadcasts_stmt(&self, _match_all_topics: &[sp_statement_store::Topic]) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted_stmt(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn posted_clear_stmt(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + unimplemented!() + } + + fn submit(&self, _statement: sp_statement_store::Statement, _source: sp_statement_store::StatementSource) -> sp_statement_store::SubmitResult { + unimplemented!() + } + + fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> { + unimplemented!() + } + + fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> { + unimplemented!() + } + } + + #[test] + fn test() { + let (queue_sender, _queue_receiver) = async_channel::bounded(100); + + let _handler = StatementHandler { + protocol_name: "/statement/1".into(), + notification_service: Box::new(TestNotificationService {}), + propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT)) + as Pin + Send>>) + .fuse(), + pending_statements: FuturesUnordered::new(), + pending_statements_peers: HashMap::new(), + network: TestNetwork {}, + sync: TestSync {}, + sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>).fuse(), + peers: HashMap::new(), + statement_store: Arc::new(TestStatementStore {}), + queue_sender, + metrics: None, + }; + } +} From 8703601402ba90bd86d0678ec8c0a921558cbef7 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 12:59:24 +0200 Subject: [PATCH 16/42] Add dirty test --- Cargo.lock | 1 + substrate/client/network/statement/Cargo.toml | 1 + substrate/client/network/statement/src/lib.rs | 132 ++++++++++++++---- 3 files changed, 108 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9dfb017e9324..c99f5885a99ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20479,6 +20479,7 @@ dependencies = [ "sp-runtime", "sp-statement-store", "substrate-prometheus-endpoint", + "tokio", ] [[package]] diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index 41c4e3cd83162..9247d6a31843d 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -32,3 +32,4 @@ sp-statement-store = { workspace = true, default-features = true } [dev-dependencies] async-trait = { workspace = true } +tokio = { workspace = true } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index a6f50f29b2bbb..4c760c2327039 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -588,6 +588,9 @@ where #[cfg(test)] mod tests { + + use std::sync::atomic::{AtomicUsize, Ordering}; + use super::*; struct TestNetwork {} @@ -677,14 +680,17 @@ mod tests { struct TestSync {} impl SyncEventStream for TestSync { - fn event_stream(&self, _name: &'static str) -> Pin + Send>> { + fn event_stream( + &self, + _name: &'static str, + ) -> Pin + Send>> { unimplemented!() } } impl sp_consensus::SyncOracle for TestSync { fn is_major_syncing(&self) -> bool { - unimplemented!() + false } fn is_offline(&self) -> bool { @@ -692,16 +698,17 @@ mod tests { } } - impl NetworkEventStream for TestNetwork { - fn event_stream(&self, _name: &'static str) -> Pin + Send>> { + fn event_stream( + &self, + _name: &'static str, + ) -> Pin + Send>> { unimplemented!() } } #[derive(Debug)] - struct TestNotificationService { - } + struct TestNotificationService {} #[async_trait::async_trait] impl NotificationService for TestNotificationService { @@ -745,23 +752,48 @@ mod tests { unimplemented!() } - fn message_sink(&self, _peer: &PeerId) -> Option> { + fn message_sink( + &self, + _peer: &PeerId, + ) -> Option> { unimplemented!() } } - struct TestStatementStore {} + struct TestStatementStore { + statements_calls: AtomicUsize, + take_recent_statements_calls: AtomicUsize, + } + + impl TestStatementStore { + fn new() -> Self { + Self { statements_calls: 0.into(), take_recent_statements_calls: 0.into() } + } + } impl StatementStore for TestStatementStore { - fn statements(&self) -> sp_statement_store::Result> { - unimplemented!() + fn statements( + &self, + ) -> sp_statement_store::Result< + Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, + > { + self.statements_calls.fetch_add(1, Ordering::Relaxed); + Ok(vec![]) } - fn take_recent_statements(&self) -> sp_statement_store::Result> { - unimplemented!() + fn take_recent_statements( + &self, + ) -> sp_statement_store::Result< + Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, + > { + self.take_recent_statements_calls.fetch_add(1, Ordering::Relaxed); + Ok(vec![]) } - fn statement(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result> { + fn statement( + &self, + _hash: &sp_statement_store::Hash, + ) -> sp_statement_store::Result> { unimplemented!() } @@ -769,31 +801,57 @@ mod tests { unimplemented!() } - fn broadcasts(&self, _match_all_topics: &[sp_statement_store::Topic]) -> sp_statement_store::Result>> { + fn broadcasts( + &self, + _match_all_topics: &[sp_statement_store::Topic], + ) -> sp_statement_store::Result>> { unimplemented!() } - fn posted(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + fn posted( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { unimplemented!() } - fn posted_clear(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + fn posted_clear( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { unimplemented!() } - fn broadcasts_stmt(&self, _match_all_topics: &[sp_statement_store::Topic]) -> sp_statement_store::Result>> { + fn broadcasts_stmt( + &self, + _match_all_topics: &[sp_statement_store::Topic], + ) -> sp_statement_store::Result>> { unimplemented!() } - fn posted_stmt(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + fn posted_stmt( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { unimplemented!() } - fn posted_clear_stmt(&self, _match_all_topics: &[sp_statement_store::Topic], _dest: [u8; 32]) -> sp_statement_store::Result>> { + fn posted_clear_stmt( + &self, + _match_all_topics: &[sp_statement_store::Topic], + _dest: [u8; 32], + ) -> sp_statement_store::Result>> { unimplemented!() } - fn submit(&self, _statement: sp_statement_store::Statement, _source: sp_statement_store::StatementSource) -> sp_statement_store::SubmitResult { + fn submit( + &self, + _statement: sp_statement_store::Statement, + _source: sp_statement_store::StatementSource, + ) -> sp_statement_store::SubmitResult { unimplemented!() } @@ -806,25 +864,47 @@ mod tests { } } - #[test] - fn test() { + fn build_handler( + statement_store: Arc, + propagate_receiver: futures::channel::mpsc::UnboundedReceiver<()>, + ) -> StatementHandler { let (queue_sender, _queue_receiver) = async_channel::bounded(100); - let _handler = StatementHandler { + let handler = StatementHandler { protocol_name: "/statement/1".into(), notification_service: Box::new(TestNotificationService {}), - propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT)) + propagate_timeout: (Box::pin(propagate_receiver) as Pin + Send>>) .fuse(), pending_statements: FuturesUnordered::new(), pending_statements_peers: HashMap::new(), network: TestNetwork {}, sync: TestSync {}, - sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>).fuse(), + sync_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), peers: HashMap::new(), - statement_store: Arc::new(TestStatementStore {}), + statement_store, queue_sender, metrics: None, }; + + handler + } + + #[tokio::test] + async fn test_propogates_only_recent_statements() { + let statement_store = Arc::new(TestStatementStore::new()); + let (mut propagate_sender, propagate_receiver) = futures::channel::mpsc::unbounded(); + let handler = build_handler(statement_store.clone(), propagate_receiver); + + tokio::spawn(handler.run()); + + // Fire the propagate timer + let x = propagate_sender.send(()).await; + assert!(x.is_ok()); + + assert_eq!(statement_store.statements_calls.load(Ordering::Relaxed), 0); + assert_eq!(statement_store.take_recent_statements_calls.load(Ordering::Relaxed), 1); } } From 90c0799f6a3378875d2adecf06347722c44835cf Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 14:58:01 +0200 Subject: [PATCH 17/42] Add more tests --- substrate/client/network/statement/src/lib.rs | 78 +++++++++++++------ 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 4c760c2327039..4b9da3b1ea430 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -610,7 +610,7 @@ mod tests { } fn report_peer(&self, _peer_id: PeerId, _cost_benefit: sc_network::ReputationChange) { - unimplemented!() + // No-op for tests } fn peer_reputation(&self, _: &PeerId) -> i32 { @@ -741,7 +741,7 @@ mod tests { } async fn next_event(&mut self) -> Option { - unimplemented!() + None } fn clone(&mut self) -> Result, ()> { @@ -763,11 +763,14 @@ mod tests { struct TestStatementStore { statements_calls: AtomicUsize, take_recent_statements_calls: AtomicUsize, + statements: HashMap, } impl TestStatementStore { - fn new() -> Self { - Self { statements_calls: 0.into(), take_recent_statements_calls: 0.into() } + fn new( + statements: HashMap, + ) -> Self { + Self { statements_calls: 0.into(), take_recent_statements_calls: 0.into(), statements } } } @@ -778,7 +781,7 @@ mod tests { Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, > { self.statements_calls.fetch_add(1, Ordering::Relaxed); - Ok(vec![]) + Ok(self.statements.iter().map(|(h, s)| (*h, s.clone())).collect()) } fn take_recent_statements( @@ -797,8 +800,8 @@ mod tests { unimplemented!() } - fn has_statement(&self, _hash: &sp_statement_store::Hash) -> bool { - unimplemented!() + fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool { + self.statements.contains_key(hash) } fn broadcasts( @@ -866,14 +869,12 @@ mod tests { fn build_handler( statement_store: Arc, - propagate_receiver: futures::channel::mpsc::UnboundedReceiver<()>, + queue_sender: async_channel::Sender<(Statement, oneshot::Sender)>, ) -> StatementHandler { - let (queue_sender, _queue_receiver) = async_channel::bounded(100); - - let handler = StatementHandler { + StatementHandler { protocol_name: "/statement/1".into(), notification_service: Box::new(TestNotificationService {}), - propagate_timeout: (Box::pin(propagate_receiver) + propagate_timeout: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), pending_statements: FuturesUnordered::new(), @@ -887,24 +888,53 @@ mod tests { statement_store, queue_sender, metrics: None, - }; + } + } - handler + #[test] + fn test_propogates_only_recent_statements() { + let statement_store = Arc::new(TestStatementStore::new(Default::default())); + let (submit_queue_sender, _submit_queue_receiver) = async_channel::bounded(2); + let mut handler = build_handler(statement_store.clone(), submit_queue_sender); + + handler.propagate_statements(); + + assert_eq!(statement_store.statements_calls.load(Ordering::Relaxed), 0); + assert_eq!(statement_store.take_recent_statements_calls.load(Ordering::Relaxed), 1); } #[tokio::test] - async fn test_propogates_only_recent_statements() { - let statement_store = Arc::new(TestStatementStore::new()); - let (mut propagate_sender, propagate_receiver) = futures::channel::mpsc::unbounded(); - let handler = build_handler(statement_store.clone(), propagate_receiver); + async fn test_skips_processing_statements_that_already_in_store() { + let mut statement1 = Statement::new(); + statement1.set_plain_data(b"statement1".to_vec()); + let hash1 = statement1.hash(); + + let mut statement2 = Statement::new(); + statement2.set_plain_data(b"statement2".to_vec()); + let hash2 = statement2.hash(); + + let mut statements = HashMap::new(); + statements.insert(hash1, statement1.clone()); + + let statement_store = Arc::new(TestStatementStore::new(statements)); + let (submit_queue_sender, submit_queue_receiver) = async_channel::bounded(2); + let mut handler = build_handler(statement_store.clone(), submit_queue_sender); + + let peer_id = PeerId::random(); + handler.peers.insert( + peer_id, + Peer { + known_statements: LruHashSet::new(NonZeroUsize::new(2).unwrap()), + role: ObservedRole::Full, + }, + ); - tokio::spawn(handler.run()); + handler.on_statements(peer_id, vec![statement1, statement2]); - // Fire the propagate timer - let x = propagate_sender.send(()).await; - assert!(x.is_ok()); + let to_submit = submit_queue_receiver.try_recv(); + assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued"); - assert_eq!(statement_store.statements_calls.load(Ordering::Relaxed), 0); - assert_eq!(statement_store.take_recent_statements_calls.load(Ordering::Relaxed), 1); + let no_more = submit_queue_receiver.try_recv(); + assert!(no_more.is_err(), "Expected only one statement to be queued"); } } From bd3645e57bb09e3116bf268f3cac5e82b0d417c7 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 15:30:52 +0200 Subject: [PATCH 18/42] Fix duplicate processing --- substrate/client/network/statement/src/lib.rs | 103 +++++++++++++++--- 1 file changed, 85 insertions(+), 18 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 4b9da3b1ea430..0fb3018a524f7 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -425,15 +425,22 @@ where let hash = s.hash(); peer.known_statements.insert(hash); - self.network.report_peer(who, rep::ANY_STATEMENT); - if self.statement_store.has_statement(&hash) { if let Some(ref metrics) = self.metrics { metrics.known_statements_received.inc(); } + + if let Some(peers) = self.pending_statements_peers.get(&hash) { + if peers.contains(&who) { + // Already received this from the same peer. + self.network.report_peer(who, rep::DUPLICATE_STATEMENT); + } + } continue; } + self.network.report_peer(who, rep::ANY_STATEMENT); + match self.pending_statements_peers.entry(hash) { Entry::Vacant(entry) => { let (completion_sender, completion_receiver) = oneshot::channel(); @@ -589,11 +596,27 @@ where #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, + }; use super::*; - struct TestNetwork {} + #[derive(Clone)] + struct TestNetwork { + reported_peers: Arc>>, + } + + impl TestNetwork { + fn new() -> Self { + Self { reported_peers: Arc::new(Mutex::new(Vec::new())) } + } + + fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> { + self.reported_peers.lock().unwrap().clone() + } + } #[async_trait::async_trait] impl NetworkPeers for TestNetwork { @@ -609,8 +632,8 @@ mod tests { unimplemented!() } - fn report_peer(&self, _peer_id: PeerId, _cost_benefit: sc_network::ReputationChange) { - // No-op for tests + fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) { + self.reported_peers.lock().unwrap().push((peer_id, cost_benefit)); } fn peer_reputation(&self, _: &PeerId) -> i32 { @@ -763,14 +786,18 @@ mod tests { struct TestStatementStore { statements_calls: AtomicUsize, take_recent_statements_calls: AtomicUsize, - statements: HashMap, + statements: Mutex>, } impl TestStatementStore { fn new( statements: HashMap, ) -> Self { - Self { statements_calls: 0.into(), take_recent_statements_calls: 0.into(), statements } + Self { + statements_calls: 0.into(), + take_recent_statements_calls: 0.into(), + statements: Mutex::new(statements), + } } } @@ -781,7 +808,7 @@ mod tests { Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, > { self.statements_calls.fetch_add(1, Ordering::Relaxed); - Ok(self.statements.iter().map(|(h, s)| (*h, s.clone())).collect()) + Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect()) } fn take_recent_statements( @@ -801,7 +828,7 @@ mod tests { } fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool { - self.statements.contains_key(hash) + self.statements.lock().unwrap().contains_key(hash) } fn broadcasts( @@ -870,8 +897,9 @@ mod tests { fn build_handler( statement_store: Arc, queue_sender: async_channel::Sender<(Statement, oneshot::Sender)>, - ) -> StatementHandler { - StatementHandler { + ) -> (StatementHandler, TestNetwork) { + let network = TestNetwork::new(); + let handler = StatementHandler { protocol_name: "/statement/1".into(), notification_service: Box::new(TestNotificationService {}), propagate_timeout: (Box::pin(futures::stream::pending()) @@ -879,7 +907,7 @@ mod tests { .fuse(), pending_statements: FuturesUnordered::new(), pending_statements_peers: HashMap::new(), - network: TestNetwork {}, + network: network.clone(), sync: TestSync {}, sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) @@ -888,14 +916,15 @@ mod tests { statement_store, queue_sender, metrics: None, - } + }; + (handler, network) } #[test] fn test_propogates_only_recent_statements() { let statement_store = Arc::new(TestStatementStore::new(Default::default())); let (submit_queue_sender, _submit_queue_receiver) = async_channel::bounded(2); - let mut handler = build_handler(statement_store.clone(), submit_queue_sender); + let (mut handler, _network) = build_handler(statement_store.clone(), submit_queue_sender); handler.propagate_statements(); @@ -903,8 +932,8 @@ mod tests { assert_eq!(statement_store.take_recent_statements_calls.load(Ordering::Relaxed), 1); } - #[tokio::test] - async fn test_skips_processing_statements_that_already_in_store() { + #[test] + fn test_skips_processing_statements_that_already_in_store() { let mut statement1 = Statement::new(); statement1.set_plain_data(b"statement1".to_vec()); let hash1 = statement1.hash(); @@ -918,7 +947,7 @@ mod tests { let statement_store = Arc::new(TestStatementStore::new(statements)); let (submit_queue_sender, submit_queue_receiver) = async_channel::bounded(2); - let mut handler = build_handler(statement_store.clone(), submit_queue_sender); + let (mut handler, _network) = build_handler(statement_store.clone(), submit_queue_sender); let peer_id = PeerId::random(); handler.peers.insert( @@ -937,4 +966,42 @@ mod tests { let no_more = submit_queue_receiver.try_recv(); assert!(no_more.is_err(), "Expected only one statement to be queued"); } + + #[test] + fn test_reports_for_duplicate_statements() { + let mut statement1 = Statement::new(); + statement1.set_plain_data(b"statement1".to_vec()); + + let statements = HashMap::new(); + + let statement_store = Arc::new(TestStatementStore::new(statements)); + let (submit_queue_sender, submit_queue_receiver) = async_channel::bounded(2); + let (mut handler, network) = build_handler(statement_store.clone(), submit_queue_sender); + + let peer_id = PeerId::random(); + handler.peers.insert( + peer_id, + Peer { + known_statements: LruHashSet::new(NonZeroUsize::new(2).unwrap()), + role: ObservedRole::Full, + }, + ); + + handler.on_statements(peer_id, vec![statement1.clone()]); + let (s, _) = submit_queue_receiver.try_recv().unwrap(); + let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s); + + handler.on_statements(peer_id, vec![statement1]); + + let reports = network.get_reports(); + assert_eq!( + reports, + vec![ + (peer_id, rep::ANY_STATEMENT), // Report for first statement + (peer_id, rep::DUPLICATE_STATEMENT) // Report for duplicate statement + ], + "Expected ANY_STATEMENT and DUPLICATE_STATEMENT reputation change, but got: {:?}", + reports + ); + } } From 310e05b082082d114154749789e4a3891c66ebb2 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 16:47:34 +0200 Subject: [PATCH 19/42] Add more tests --- substrate/client/network/statement/src/lib.rs | 265 ++++++++++++------ 1 file changed, 185 insertions(+), 80 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 0fb3018a524f7..b7bc3837224dd 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -596,12 +596,8 @@ where #[cfg(test)] mod tests { - use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Mutex, - }; - use super::*; + use std::sync::Mutex; #[derive(Clone)] struct TestNetwork { @@ -730,8 +726,20 @@ mod tests { } } - #[derive(Debug)] - struct TestNotificationService {} + #[derive(Debug, Clone)] + struct TestNotificationService { + sent_notifications: Arc)>>>, + } + + impl TestNotificationService { + fn new() -> Self { + Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) } + } + + fn get_sent_notifications(&self) -> Vec<(PeerId, Vec)> { + self.sent_notifications.lock().unwrap().clone() + } + } #[async_trait::async_trait] impl NotificationService for TestNotificationService { @@ -743,8 +751,8 @@ mod tests { unimplemented!() } - fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec) { - unimplemented!() + fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec) { + self.sent_notifications.lock().unwrap().push((*peer, notification)); } async fn send_async_notification( @@ -783,21 +791,16 @@ mod tests { } } + #[derive(Clone)] struct TestStatementStore { - statements_calls: AtomicUsize, - take_recent_statements_calls: AtomicUsize, - statements: Mutex>, + statements: Arc>>, + recent_statements: + Arc>>, } impl TestStatementStore { - fn new( - statements: HashMap, - ) -> Self { - Self { - statements_calls: 0.into(), - take_recent_statements_calls: 0.into(), - statements: Mutex::new(statements), - } + fn new() -> Self { + Self { statements: Default::default(), recent_statements: Default::default() } } } @@ -807,7 +810,6 @@ mod tests { ) -> sp_statement_store::Result< Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, > { - self.statements_calls.fetch_add(1, Ordering::Relaxed); Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect()) } @@ -816,8 +818,7 @@ mod tests { ) -> sp_statement_store::Result< Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>, > { - self.take_recent_statements_calls.fetch_add(1, Ordering::Relaxed); - Ok(vec![]) + Ok(self.recent_statements.lock().unwrap().drain().collect()) } fn statement( @@ -894,14 +895,30 @@ mod tests { } } - fn build_handler( - statement_store: Arc, - queue_sender: async_channel::Sender<(Statement, oneshot::Sender)>, - ) -> (StatementHandler, TestNetwork) { + fn build_handler() -> ( + StatementHandler, + TestStatementStore, + TestNetwork, + TestNotificationService, + async_channel::Receiver<(Statement, oneshot::Sender)>, + ) { + let statement_store = TestStatementStore::new(); + let (queue_sender, queue_receiver) = async_channel::bounded(2); let network = TestNetwork::new(); + let notification_service = TestNotificationService::new(); + let peer_id = PeerId::random(); + let mut peers = HashMap::new(); + peers.insert( + peer_id, + Peer { + known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()), + role: ObservedRole::Full, + }, + ); + let handler = StatementHandler { protocol_name: "/statement/1".into(), - notification_service: Box::new(TestNotificationService {}), + notification_service: Box::new(notification_service.clone()), propagate_timeout: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), @@ -912,84 +929,57 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), - peers: HashMap::new(), - statement_store, + peers, + statement_store: Arc::new(statement_store.clone()), queue_sender, metrics: None, }; - (handler, network) - } - - #[test] - fn test_propogates_only_recent_statements() { - let statement_store = Arc::new(TestStatementStore::new(Default::default())); - let (submit_queue_sender, _submit_queue_receiver) = async_channel::bounded(2); - let (mut handler, _network) = build_handler(statement_store.clone(), submit_queue_sender); - - handler.propagate_statements(); - - assert_eq!(statement_store.statements_calls.load(Ordering::Relaxed), 0); - assert_eq!(statement_store.take_recent_statements_calls.load(Ordering::Relaxed), 1); + (handler, statement_store, network, notification_service, queue_receiver) } #[test] fn test_skips_processing_statements_that_already_in_store() { + let (mut handler, statement_store, _network, _notification_service, queue_receiver) = + build_handler(); + let mut statement1 = Statement::new(); statement1.set_plain_data(b"statement1".to_vec()); let hash1 = statement1.hash(); + statement_store.statements.lock().unwrap().insert(hash1, statement1.clone()); + let mut statement2 = Statement::new(); statement2.set_plain_data(b"statement2".to_vec()); let hash2 = statement2.hash(); - let mut statements = HashMap::new(); - statements.insert(hash1, statement1.clone()); - - let statement_store = Arc::new(TestStatementStore::new(statements)); - let (submit_queue_sender, submit_queue_receiver) = async_channel::bounded(2); - let (mut handler, _network) = build_handler(statement_store.clone(), submit_queue_sender); - - let peer_id = PeerId::random(); - handler.peers.insert( - peer_id, - Peer { - known_statements: LruHashSet::new(NonZeroUsize::new(2).unwrap()), - role: ObservedRole::Full, - }, - ); + let peer_id = *handler.peers.keys().next().unwrap(); handler.on_statements(peer_id, vec![statement1, statement2]); - let to_submit = submit_queue_receiver.try_recv(); + let to_submit = queue_receiver.try_recv(); assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued"); - let no_more = submit_queue_receiver.try_recv(); + let no_more = queue_receiver.try_recv(); assert!(no_more.is_err(), "Expected only one statement to be queued"); } #[test] fn test_reports_for_duplicate_statements() { - let mut statement1 = Statement::new(); - statement1.set_plain_data(b"statement1".to_vec()); + let (mut handler, statement_store, network, _notification_service, queue_receiver) = + build_handler(); - let statements = HashMap::new(); + let peer_id = *handler.peers.keys().next().unwrap(); - let statement_store = Arc::new(TestStatementStore::new(statements)); - let (submit_queue_sender, submit_queue_receiver) = async_channel::bounded(2); - let (mut handler, network) = build_handler(statement_store.clone(), submit_queue_sender); - - let peer_id = PeerId::random(); - handler.peers.insert( - peer_id, - Peer { - known_statements: LruHashSet::new(NonZeroUsize::new(2).unwrap()), - role: ObservedRole::Full, - }, - ); + let mut statement1 = Statement::new(); + statement1.set_plain_data(b"statement1".to_vec()); handler.on_statements(peer_id, vec![statement1.clone()]); - let (s, _) = submit_queue_receiver.try_recv().unwrap(); - let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s); + { + // Manually process statements submission + let (s, _) = queue_receiver.try_recv().unwrap(); + let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s); + handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND); + } handler.on_statements(peer_id, vec![statement1]); @@ -997,11 +987,126 @@ mod tests { assert_eq!( reports, vec![ - (peer_id, rep::ANY_STATEMENT), // Report for first statement - (peer_id, rep::DUPLICATE_STATEMENT) // Report for duplicate statement + (peer_id, rep::ANY_STATEMENT), // Report for first statement + (peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement + (peer_id, rep::DUPLICATE_STATEMENT) // Report for duplicate statement ], - "Expected ANY_STATEMENT and DUPLICATE_STATEMENT reputation change, but got: {:?}", + "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}", reports ); } + + #[test] + fn test_splits_large_batches_into_smaller_chunks() { + let (mut handler, statement_store, _network, notification_service, _queue_receiver) = + build_handler(); + + let num_statements = 30; + let statement_size = 10 * 1024; // 10KB per statement + for i in 0..num_statements { + let mut statement = Statement::new(); + let mut data = vec![0u8; statement_size]; + data[0] = i as u8; + statement.set_plain_data(data); + let hash = statement.hash(); + statement_store.recent_statements.lock().unwrap().insert(hash, statement); + } + + handler.propagate_statements(); + + let sent = notification_service.get_sent_notifications(); + let mut total_statements_sent = 0; + assert!( + sent.len() == 2, + "Expected batch to be split into 2 chunks, but got {} chunks", + sent.len() + ); + for (_peer, notification) in sent.iter() { + assert!( + notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize, + "Notification size {} exceeds limit {}", + notification.len(), + MAX_STATEMENT_NOTIFICATION_SIZE + ); + if let Ok(stmts) = ::decode(&mut notification.as_slice()) { + total_statements_sent += stmts.len(); + } + } + + assert_eq!( + total_statements_sent, num_statements, + "Expected all {} statements to be sent, but only {} were sent", + num_statements, total_statements_sent + ); + } + + #[test] + fn test_skips_only_oversized_statements() { + let (mut handler, statement_store, _network, notification_service, _queue_receiver) = + build_handler(); + + let mut statement1 = Statement::new(); + statement1.set_plain_data(vec![1u8; 100]); + let hash1 = statement1.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash1, statement1.clone()); + + let mut oversized1 = Statement::new(); + oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]); + let hash_oversized1 = oversized1.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash_oversized1, oversized1); + + let mut statement2 = Statement::new(); + statement2.set_plain_data(vec![3u8; 100]); + let hash2 = statement2.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash2, statement2.clone()); + + let mut oversized2 = Statement::new(); + oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]); + let hash_oversized2 = oversized2.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash_oversized2, oversized2); + + let mut statement3 = Statement::new(); + statement3.set_plain_data(vec![5u8; 100]); + let hash3 = statement3.hash(); + statement_store + .recent_statements + .lock() + .unwrap() + .insert(hash3, statement3.clone()); + + handler.propagate_statements(); + + let sent = notification_service.get_sent_notifications(); + + let sent_hashes = sent + .iter() + .map(|(_peer, notification)| { + let stmts = ::decode(&mut notification.as_slice()).unwrap(); + assert_eq!( + stmts.len(), + 1, + "Each notification should contain exactly one small statement" + ); + stmts.first().unwrap().hash() + }) + .collect::>(); + + assert_eq!(sent_hashes, vec![hash1, hash2, hash3], "Only small statements should be sent"); + } } From ed55be305c91a8b6d9d2382b7f69b9152286c98e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 17:53:11 +0200 Subject: [PATCH 20/42] Add metrics for ignored statements --- substrate/client/network/statement/src/lib.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index b7bc3837224dd..cc3a4e2e63be5 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -94,6 +94,7 @@ struct Metrics { skipped_oversized_statements: Counter, propagated_statements_chunks: Histogram, pending_statements: Gauge, + ignored_statements: Counter, } impl Metrics { @@ -136,6 +137,13 @@ impl Metrics { )?, r, )?, + ignored_statements: register( + Counter::new( + "substrate_sync_ignored_statements", + "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit", + )?, + r, + )?, }) } } @@ -412,13 +420,18 @@ where fn on_statements(&mut self, who: PeerId, statements: Statements) { log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who); if let Some(ref mut peer) = self.peers.get_mut(&who) { + let mut statements_left = statements.len() as u64; for s in statements { if self.pending_statements.len() > MAX_PENDING_STATEMENTS { log::debug!( target: LOG_TARGET, - "Ignoring any further statements that exceed `MAX_PENDING_STATEMENTS`({}) limit", + "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit", + statements_left, MAX_PENDING_STATEMENTS, ); + if let Some(ref metrics) = self.metrics { + metrics.ignored_statements.inc_by(statements_left); + } break } @@ -476,6 +489,8 @@ where } }, } + + statements_left -= 1; } } } From 514ad778e05d240f06513fc0a324ff5ab3398651 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 17:57:50 +0200 Subject: [PATCH 21/42] Remove unused tokio --- Cargo.lock | 1 - substrate/client/network/statement/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c99f5885a99ab..c9dfb017e9324 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20479,7 +20479,6 @@ dependencies = [ "sp-runtime", "sp-statement-store", "substrate-prometheus-endpoint", - "tokio", ] [[package]] diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index 9247d6a31843d..41c4e3cd83162 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -32,4 +32,3 @@ sp-statement-store = { workspace = true, default-features = true } [dev-dependencies] async-trait = { workspace = true } -tokio = { workspace = true } From 0adf1699c4ba642603f573c24c6ca9b2ff1f3f8e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 7 Oct 2025 21:02:20 +0200 Subject: [PATCH 22/42] Fix tests --- substrate/client/network/statement/src/lib.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index cc3a4e2e63be5..6b394aae675bb 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -1109,19 +1109,17 @@ mod tests { let sent = notification_service.get_sent_notifications(); - let sent_hashes = sent + let mut sent_hashes = sent .iter() .map(|(_peer, notification)| { - let stmts = ::decode(&mut notification.as_slice()).unwrap(); - assert_eq!( - stmts.len(), - 1, - "Each notification should contain exactly one small statement" - ); - stmts.first().unwrap().hash() + ::decode(&mut notification.as_slice()).unwrap() }) + .flatten() + .map(|s| s.hash()) .collect::>(); - - assert_eq!(sent_hashes, vec![hash1, hash2, hash3], "Only small statements should be sent"); + sent_hashes.sort(); + let mut expected_hashes = vec![hash1, hash2, hash3]; + expected_hashes.sort(); + assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent"); } } From 6daaee00cc62ab4690021449ba792f697352c4c2 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 08:57:00 +0200 Subject: [PATCH 23/42] Fix style --- substrate/client/statement-store/src/lib.rs | 26 +++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index 5ad7a23ba9c8a..049b0b44719f1 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -775,12 +775,13 @@ impl StatementStore for Store { let index = self.index.read(); let mut result = Vec::with_capacity(index.entries.len()); for hash in index.entries.keys().cloned() { - let encoded = - self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?; - if let Some(encoded) = encoded { - if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { - result.push((hash, statement)); - } + let Some(encoded) = + self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))? + else { + continue + }; + if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { + result.push((hash, statement)); } } Ok(result) @@ -791,12 +792,13 @@ impl StatementStore for Store { let recent = index.take_recent(); let mut result = Vec::with_capacity(recent.len()); for hash in recent { - let encoded = - self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?; - if let Some(encoded) = encoded { - if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { - result.push((hash, statement)); - } + let Some(encoded) = + self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))? + else { + continue + }; + if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) { + result.push((hash, statement)); } } Ok(result) From 0012b9ef399349da76240d7040ba2fbab9152d0f Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 09:27:50 +0200 Subject: [PATCH 24/42] Fix skipping very large statements --- substrate/client/network/statement/src/lib.rs | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 6b394aae675bb..5babb043e43ab 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -567,20 +567,24 @@ where // Size exceeded - split the chunk let split_factor = (encoded_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1; - let new_chunk_size = (current_end - offset) / split_factor; + let mut new_chunk_size = (current_end - offset) / split_factor; // Single statement is too large if new_chunk_size == 0 { - log::warn!( - target: LOG_TARGET, - "Statement too large ({} KB), skipping", - encoded_size / 1024 - ); - if let Some(ref metrics) = self.metrics { - metrics.skipped_oversized_statements.inc(); + if chunk.len() == 1 { + log::warn!( + target: LOG_TARGET, + "Statement too large ({} KB), skipping", + encoded_size / 1024 + ); + if let Some(ref metrics) = self.metrics { + metrics.skipped_oversized_statements.inc(); + } + offset = current_end; + break; } - offset = current_end; - break; + // Don't skip more than one statement at once + new_chunk_size = 1; } // Reduce chunk size and try again @@ -1070,7 +1074,7 @@ mod tests { .insert(hash1, statement1.clone()); let mut oversized1 = Statement::new(); - oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]); + oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]); let hash_oversized1 = oversized1.hash(); statement_store .recent_statements @@ -1111,10 +1115,9 @@ mod tests { let mut sent_hashes = sent .iter() - .map(|(_peer, notification)| { + .flat_map(|(_peer, notification)| { ::decode(&mut notification.as_slice()).unwrap() }) - .flatten() .map(|s| s.hash()) .collect::>(); sent_hashes.sort(); From dad3ecda00c986c1e0e90ebee659abab5692af75 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 10:34:57 +0200 Subject: [PATCH 25/42] Limit the size of the statement for further gossiping --- Cargo.lock | 1 + substrate/client/network/statement/src/config.rs | 2 +- substrate/client/statement-store/Cargo.toml | 1 + substrate/client/statement-store/src/lib.rs | 7 +++++-- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f400ab177d36e..017e330f6d5de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20914,6 +20914,7 @@ dependencies = [ "parking_lot 0.12.3", "sc-client-api", "sc-keystore", + "sc-network-statement", "sp-api", "sp-blockchain", "sp-core 28.0.0", diff --git a/substrate/client/network/statement/src/config.rs b/substrate/client/network/statement/src/config.rs index f4546b027c17e..3bd19ce96f200 100644 --- a/substrate/client/network/statement/src/config.rs +++ b/substrate/client/network/statement/src/config.rs @@ -27,7 +27,7 @@ pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; /// Maximum allowed size for a statement notification. -pub(crate) const MAX_STATEMENT_SIZE: u64 = 256 * 1024; +pub const MAX_STATEMENT_SIZE: u64 = 256 * 1024; /// Maximum number of statement validation request we keep at any moment. pub(crate) const MAX_PENDING_STATEMENTS: usize = 2 * 1024 * 1024; diff --git a/substrate/client/statement-store/Cargo.toml b/substrate/client/statement-store/Cargo.toml index c0219b294cede..f6aa1c2afb853 100644 --- a/substrate/client/statement-store/Cargo.toml +++ b/substrate/client/statement-store/Cargo.toml @@ -22,6 +22,7 @@ parking_lot = { workspace = true, default-features = true } prometheus-endpoint = { workspace = true, default-features = true } sc-client-api = { workspace = true, default-features = true } sc-keystore = { workspace = true, default-features = true } +sc-network-statement = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-blockchain = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index d857cbc51c8ad..ea967133e10bd 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -83,6 +83,9 @@ pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million /// The maximum amount of data the statement store can hold, regardless of the number of /// statements from which the data originates. pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB +/// The maximum size of a single statement in bytes. +/// Accounts for the 1-byte vector length prefix when statements are gossiped as Vec. +pub const MAX_STATEMENT_SIZE: u32 = sc_network_statement::config::MAX_STATEMENT_SIZE as u32 - 1; const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); @@ -377,8 +380,8 @@ impl Index { validation: &ValidStatement, current_time: u64, ) -> MaybeInserted { - let statement_len = statement.data_len(); - if statement_len > validation.max_size as usize { + let statement_len = statement.encoded_size(); + if statement_len > validation.max_size.min(MAX_STATEMENT_SIZE) as usize { log::debug!( target: LOG_TARGET, "Ignored oversize message: {:?} ({} bytes)", From 6407d0e722c735e5429fd0cce8a49315138420fb Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 8 Oct 2025 08:38:43 +0000 Subject: [PATCH 26/42] Update from github-actions[bot] running command 'prdoc --audience node_dev --bump minor' --- prdoc/pr_9965.prdoc | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 prdoc/pr_9965.prdoc diff --git a/prdoc/pr_9965.prdoc b/prdoc/pr_9965.prdoc new file mode 100644 index 0000000000000..c94e909b78c18 --- /dev/null +++ b/prdoc/pr_9965.prdoc @@ -0,0 +1,35 @@ +title: Limit the size of the statement for further gossiping +doc: +- audience: Node Dev + description: "\u2704 -----------------------------------------------------------------------------\n\ + \nThank you for your Pull Request! \U0001F64F Please make sure it follows the\ + \ contribution guidelines outlined in [this\ndocument](https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md)\ + \ and fill out the\nsections below. Once you're ready to submit your PR for review,\ + \ please delete this section and leave only the text under\nthe \"Description\"\ + \ heading.\n\n# Description\n\n*A concise description of what your PR is doing,\ + \ and what potential issue it is solving. Use [Github semantic\nlinking](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword)\n\ + to link the PR to an issue that must be closed once this is merged.*\n\n## Integration\n\ + \n*In depth notes about how this PR should be integrated by downstream projects.\ + \ This part is\nmandatory, and should be reviewed by reviewers, if the PR does\ + \ NOT have the\n`R0-no-crate-publish-required` label. In case of a `R0-no-crate-publish-required`,\ + \ it can be\nignored.*\n\n## Review Notes\n\n*In depth notes about the **implementation**\ + \ details of your PR. This should be the main guide for reviewers to\nunderstand\ + \ your approach and effectively review it. If too long, use\n[`
`](https://developer.mozilla.org/en-US/docs/Web/HTML/Element/details)*.\n\ + \n*Imagine that someone who is depending on the old code wants to integrate your\ + \ new code and the only information that\nthey get is this section. It helps to\ + \ include example usage and default value here, with a `diff` code-block to show\n\ + possibly integration.*\n\n*Include your leftover TODOs, if any, here.*\n\n# Checklist\n\ + \n* [ ] My PR includes a detailed description as outlined in the \"Description\"\ + \ and its two subsections above.\n* [ ] My PR follows the [labeling requirements](\n\ + https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process\n\ + ) of this project (at minimum one label for `T` required)\n * External contributors:\ + \ ask maintainers to put the right label on your PR.\n* [ ] I have made corresponding\ + \ changes to the documentation (if applicable)\n* [ ] I have added tests that\ + \ prove my fix is effective or that my feature works (if applicable)\n\nYou can\ + \ remove the \"Checklist\" section once all have been checked. Thank you for your\ + \ contribution!\n\n\u2704 -----------------------------------------------------------------------------" +crates: +- name: sc-network-statement + bump: minor +- name: sc-statement-store + bump: minor From 46f7462424b93e41077a9d47d4b12c3f5287be1f Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 10:53:47 +0200 Subject: [PATCH 27/42] Update pr_doc --- prdoc/pr_9965.prdoc | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/prdoc/pr_9965.prdoc b/prdoc/pr_9965.prdoc index c94e909b78c18..979c1f3ede14a 100644 --- a/prdoc/pr_9965.prdoc +++ b/prdoc/pr_9965.prdoc @@ -1,33 +1,7 @@ title: Limit the size of the statement for further gossiping doc: - audience: Node Dev - description: "\u2704 -----------------------------------------------------------------------------\n\ - \nThank you for your Pull Request! \U0001F64F Please make sure it follows the\ - \ contribution guidelines outlined in [this\ndocument](https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md)\ - \ and fill out the\nsections below. Once you're ready to submit your PR for review,\ - \ please delete this section and leave only the text under\nthe \"Description\"\ - \ heading.\n\n# Description\n\n*A concise description of what your PR is doing,\ - \ and what potential issue it is solving. Use [Github semantic\nlinking](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword)\n\ - to link the PR to an issue that must be closed once this is merged.*\n\n## Integration\n\ - \n*In depth notes about how this PR should be integrated by downstream projects.\ - \ This part is\nmandatory, and should be reviewed by reviewers, if the PR does\ - \ NOT have the\n`R0-no-crate-publish-required` label. In case of a `R0-no-crate-publish-required`,\ - \ it can be\nignored.*\n\n## Review Notes\n\n*In depth notes about the **implementation**\ - \ details of your PR. This should be the main guide for reviewers to\nunderstand\ - \ your approach and effectively review it. If too long, use\n[`
`](https://developer.mozilla.org/en-US/docs/Web/HTML/Element/details)*.\n\ - \n*Imagine that someone who is depending on the old code wants to integrate your\ - \ new code and the only information that\nthey get is this section. It helps to\ - \ include example usage and default value here, with a `diff` code-block to show\n\ - possibly integration.*\n\n*Include your leftover TODOs, if any, here.*\n\n# Checklist\n\ - \n* [ ] My PR includes a detailed description as outlined in the \"Description\"\ - \ and its two subsections above.\n* [ ] My PR follows the [labeling requirements](\n\ - https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process\n\ - ) of this project (at minimum one label for `T` required)\n * External contributors:\ - \ ask maintainers to put the right label on your PR.\n* [ ] I have made corresponding\ - \ changes to the documentation (if applicable)\n* [ ] I have added tests that\ - \ prove my fix is effective or that my feature works (if applicable)\n\nYou can\ - \ remove the \"Checklist\" section once all have been checked. Thank you for your\ - \ contribution!\n\n\u2704 -----------------------------------------------------------------------------" + description: "Limits the size of statements that are further gossiped over the network to prevent skipping oversized messages. The limit is set to match the network protocol's `MAX_STATEMENT_SIZE` (256 KB), accounting for 1-byte vector length overhead because statements are sent as `Vec`." crates: - name: sc-network-statement bump: minor From c5c15c11c11d244624d6aafab04ea8b862217713 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 11:01:26 +0200 Subject: [PATCH 28/42] Fix docs --- substrate/client/statement-store/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index ea967133e10bd..a2d73cf862b4a 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -84,7 +84,7 @@ pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million /// statements from which the data originates. pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB /// The maximum size of a single statement in bytes. -/// Accounts for the 1-byte vector length prefix when statements are gossiped as Vec. +/// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec`. pub const MAX_STATEMENT_SIZE: u32 = sc_network_statement::config::MAX_STATEMENT_SIZE as u32 - 1; const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); From 90ffbe7cbd2bc1dbc61b73551dd80caabce8f52e Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 12:42:08 +0200 Subject: [PATCH 29/42] Update the logic --- substrate/client/statement-store/src/lib.rs | 31 +++++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index a2d73cf862b4a..66b1ba0cabb43 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -85,7 +85,7 @@ pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB /// The maximum size of a single statement in bytes. /// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec`. -pub const MAX_STATEMENT_SIZE: u32 = sc_network_statement::config::MAX_STATEMENT_SIZE as u32 - 1; +pub const MAX_STATEMENT_SIZE: usize = sc_network_statement::config::MAX_STATEMENT_SIZE as usize - 1; const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); @@ -380,8 +380,10 @@ impl Index { validation: &ValidStatement, current_time: u64, ) -> MaybeInserted { - let statement_len = statement.encoded_size(); - if statement_len > validation.max_size.min(MAX_STATEMENT_SIZE) as usize { + let statement_len = statement.data_len(); + if statement_len > validation.max_size as usize || + statement.encoded_size() > MAX_STATEMENT_SIZE + { log::debug!( target: LOG_TARGET, "Ignored oversize message: {:?} ({} bytes)", @@ -1052,6 +1054,7 @@ mod tests { Some(a) if a == account(2) => (2, 1000), Some(a) if a == account(3) => (3, 1000), Some(a) if a == account(4) => (4, 1000), + Some(a) if a == account(42) => (42, 42 * crate::MAX_STATEMENT_SIZE as u32), _ => (2, 2000), }; Ok(ValidStatement{ max_count, max_size }) @@ -1324,6 +1327,28 @@ mod tests { assert_eq!(expected_statements, statements); } + #[test] + fn max_statement_size_for_gossiping() { + let (store, _temp) = test_store(); + store.index.write().options.max_total_size = 42 * crate::MAX_STATEMENT_SIZE; + + assert_eq!( + store.submit( + statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500), + StatementSource::Local + ), + SubmitResult::New(NetworkPriority::High) + ); + + assert_eq!( + store.submit( + statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE), + StatementSource::Local + ), + SubmitResult::Ignored + ); + } + #[test] fn expired_statements_are_purged() { use super::DEFAULT_PURGE_AFTER_SEC; From 6cf91c4a8fa968193e4e0b47dda2f4595e30fdbf Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 8 Oct 2025 16:41:41 +0200 Subject: [PATCH 30/42] Update logging and metrics --- substrate/client/network/statement/src/lib.rs | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 5babb043e43ab..d272506b47d2e 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -523,11 +523,13 @@ where } fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) { - let mut propagated_statements = 0; - + log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len()); for (who, peer) in self.peers.iter_mut() { + log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who); + // never send statements to light nodes if matches!(peer.role, ObservedRole::Light) { + log::trace!(target: LOG_TARGET, "{} is a light node, skipping propagation", who); continue } @@ -535,30 +537,26 @@ where .iter() .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt)) .collect::>(); - - propagated_statements += to_send.len(); + log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len()); let mut offset = 0; while offset < to_send.len() { // Try to send as many statements as possible in one notification let mut current_end = to_send.len(); + log::trace!(target: LOG_TARGET, "Looking for better chunk size"); loop { let chunk = &to_send[offset..current_end]; let encoded_size = chunk.encoded_size(); + log::trace!(target: LOG_TARGET, "Chunk: {} statements, {} KB", chunk.len(), encoded_size / 1024); // If chunk fits, send it if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { - log::trace!( - target: LOG_TARGET, - "Sending {} statements ({} KB) to {}", - chunk.len(), - encoded_size / 1024, - who - ); self.notification_service.send_sync_notification(who, chunk.encode()); offset = current_end; + log::trace!(target: LOG_TARGET, "Sent {} statements ({} KB) to {}, {} left", chunk.len(), encoded_size / 1024, who, to_send.len() - offset); if let Some(ref metrics) = self.metrics { + metrics.propagated_statements.inc_by(chunk.len() as u64); metrics.propagated_statements_chunks.observe(chunk.len() as f64); } break; @@ -572,11 +570,7 @@ where // Single statement is too large if new_chunk_size == 0 { if chunk.len() == 1 { - log::warn!( - target: LOG_TARGET, - "Statement too large ({} KB), skipping", - encoded_size / 1024 - ); + log::warn!(target: LOG_TARGET, "Statement too large ({} KB), skipping", encoded_size / 1024); if let Some(ref metrics) = self.metrics { metrics.skipped_oversized_statements.inc(); } @@ -592,10 +586,7 @@ where } } } - - if let Some(ref metrics) = self.metrics { - metrics.propagated_statements.inc_by(propagated_statements as _) - } + log::trace!(target: LOG_TARGET, "Statements propagated to all peers"); } /// Call when we must propagate ready statements to peers. @@ -605,8 +596,8 @@ where return } - log::debug!(target: LOG_TARGET, "Propagating statements"); - if let Ok(statements) = self.statement_store.take_recent_statements() { + let Ok(statements) = self.statement_store.take_recent_statements() else { return }; + if !statements.is_empty() { self.do_propagate_statements(&statements); } } From 6891b17132791c7c17508793b228a8052955722d Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 9 Oct 2025 15:34:05 +0200 Subject: [PATCH 31/42] Increase notification size --- substrate/client/network/statement/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/statement/src/config.rs b/substrate/client/network/statement/src/config.rs index 78480a2306172..6613b78debb24 100644 --- a/substrate/client/network/statement/src/config.rs +++ b/substrate/client/network/statement/src/config.rs @@ -27,7 +27,7 @@ pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; // * 32 bytes for hash = 128 MB per peer /// Maximum allowed size for a statement notification. -pub(crate) const MAX_STATEMENT_NOTIFICATION_SIZE: u64 = 256 * 1024; +pub(crate) const MAX_STATEMENT_NOTIFICATION_SIZE: u64 = 1024 * 1024; /// Maximum number of statement validation request we keep at any moment. pub(crate) const MAX_PENDING_STATEMENTS: usize = 2 * 1024 * 1024; From f52598fd2abba7b3d63f696de814529e0c7b5a35 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 9 Oct 2025 16:59:36 +0200 Subject: [PATCH 32/42] Send async notifications --- Cargo.lock | 1 + substrate/client/network/statement/Cargo.toml | 1 + substrate/client/network/statement/src/lib.rs | 40 +++++++++++-------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9dfb017e9324..c99f5885a99ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20479,6 +20479,7 @@ dependencies = [ "sp-runtime", "sp-statement-store", "substrate-prometheus-endpoint", + "tokio", ] [[package]] diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index 41c4e3cd83162..9247d6a31843d 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -32,3 +32,4 @@ sp-statement-store = { workspace = true, default-features = true } [dev-dependencies] async-trait = { workspace = true } +tokio = { workspace = true } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index d272506b47d2e..3da97b3da6bbe 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -307,7 +307,7 @@ where loop { futures::select! { _ = self.propagate_timeout.next() => { - self.propagate_statements(); + self.propagate_statements().await; if let Some(ref metrics) = self.metrics { metrics.pending_statements.set(self.pending_statements.len() as u64); } @@ -510,7 +510,7 @@ where } /// Propagate one statement. - pub fn propagate_statement(&mut self, hash: &Hash) { + pub async fn propagate_statement(&mut self, hash: &Hash) { // Accept statements only when node is not major syncing if self.sync.is_major_syncing() { return @@ -518,11 +518,11 @@ where log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash); if let Ok(Some(statement)) = self.statement_store.statement(hash) { - self.do_propagate_statements(&[(*hash, statement)]); + self.do_propagate_statements(&[(*hash, statement)]).await; } } - fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) { + async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) { log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len()); for (who, peer) in self.peers.iter_mut() { log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who); @@ -552,7 +552,14 @@ where // If chunk fits, send it if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { - self.notification_service.send_sync_notification(who, chunk.encode()); + if let Err(e) = self + .notification_service + .send_async_notification(who, chunk.encode()) + .await + { + log::debug!(target: LOG_TARGET, "Failed to send notification to {}, peer disconnected: {:?}", who, e); + break; + } offset = current_end; log::trace!(target: LOG_TARGET, "Sent {} statements ({} KB) to {}, {} left", chunk.len(), encoded_size / 1024, who, to_send.len() - offset); if let Some(ref metrics) = self.metrics { @@ -590,7 +597,7 @@ where } /// Call when we must propagate ready statements to peers. - fn propagate_statements(&mut self) { + async fn propagate_statements(&mut self) { // Send out statements only when node is not major syncing if self.sync.is_major_syncing() { return @@ -598,7 +605,7 @@ where let Ok(statements) = self.statement_store.take_recent_statements() else { return }; if !statements.is_empty() { - self.do_propagate_statements(&statements); + self.do_propagate_statements(&statements).await; } } } @@ -767,10 +774,11 @@ mod tests { async fn send_async_notification( &mut self, - _peer: &PeerId, - _notification: Vec, + peer: &PeerId, + notification: Vec, ) -> Result<(), sc_network::error::Error> { - unimplemented!() + self.sent_notifications.lock().unwrap().push((*peer, notification)); + Ok(()) } async fn set_handshake(&mut self, _handshake: Vec) -> Result<(), ()> { @@ -1006,8 +1014,8 @@ mod tests { ); } - #[test] - fn test_splits_large_batches_into_smaller_chunks() { + #[tokio::test] + async fn test_splits_large_batches_into_smaller_chunks() { let (mut handler, statement_store, _network, notification_service, _queue_receiver) = build_handler(); @@ -1022,7 +1030,7 @@ mod tests { statement_store.recent_statements.lock().unwrap().insert(hash, statement); } - handler.propagate_statements(); + handler.propagate_statements().await; let sent = notification_service.get_sent_notifications(); let mut total_statements_sent = 0; @@ -1050,8 +1058,8 @@ mod tests { ); } - #[test] - fn test_skips_only_oversized_statements() { + #[tokio::test] + async fn test_skips_only_oversized_statements() { let (mut handler, statement_store, _network, notification_service, _queue_receiver) = build_handler(); @@ -1100,7 +1108,7 @@ mod tests { .unwrap() .insert(hash3, statement3.clone()); - handler.propagate_statements(); + handler.propagate_statements().await; let sent = notification_service.get_sent_notifications(); From 5945066ac44eae169ae2a2d33065b04681235d3a Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 9 Oct 2025 17:01:22 +0200 Subject: [PATCH 33/42] Fix tests --- substrate/client/network/statement/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 3da97b3da6bbe..60faf1d2fea0c 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -1020,7 +1020,7 @@ mod tests { build_handler(); let num_statements = 30; - let statement_size = 10 * 1024; // 10KB per statement + let statement_size = 100 * 1024; // 100KB per statement for i in 0..num_statements { let mut statement = Statement::new(); let mut data = vec![0u8; statement_size]; @@ -1035,8 +1035,8 @@ mod tests { let sent = notification_service.get_sent_notifications(); let mut total_statements_sent = 0; assert!( - sent.len() == 2, - "Expected batch to be split into 2 chunks, but got {} chunks", + sent.len() == 3, + "Expected batch to be split into 3 chunks, but got {} chunks", sent.len() ); for (_peer, notification) in sent.iter() { From b5b96ebeee962cd03d764bdb99145eb2f96d7b3a Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 14 Oct 2025 11:35:46 +0300 Subject: [PATCH 34/42] Add more logs --- substrate/client/network/statement/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 60faf1d2fea0c..6e6ccc597b00d 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -445,7 +445,11 @@ where if let Some(peers) = self.pending_statements_peers.get(&hash) { if peers.contains(&who) { - // Already received this from the same peer. + log::trace!( + target: LOG_TARGET, + "Already received the statement from the same peer {}.", + who + ); self.network.report_peer(who, rep::DUPLICATE_STATEMENT); } } From 971fe1e8aff3c7ee3a910d25198ec6e40bb1acd2 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 14 Oct 2025 15:18:43 +0300 Subject: [PATCH 35/42] Modify metrics setting --- substrate/client/network/statement/src/lib.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 6e6ccc597b00d..4d9e7d30d79c9 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -308,9 +308,9 @@ where futures::select! { _ = self.propagate_timeout.next() => { self.propagate_statements().await; - if let Some(ref metrics) = self.metrics { + self.metrics.as_ref().map(|metrics| { metrics.pending_statements.set(self.pending_statements.len() as u64); - } + }); }, (hash, result) = self.pending_statements.select_next_some() => { if let Some(peers) = self.pending_statements_peers.remove(&hash) { @@ -429,9 +429,9 @@ where statements_left, MAX_PENDING_STATEMENTS, ); - if let Some(ref metrics) = self.metrics { + self.metrics.as_ref().map(|metrics| { metrics.ignored_statements.inc_by(statements_left); - } + }); break } @@ -439,9 +439,9 @@ where peer.known_statements.insert(hash); if self.statement_store.has_statement(&hash) { - if let Some(ref metrics) = self.metrics { + self.metrics.as_ref().map(|metrics| { metrics.known_statements_received.inc(); - } + }); if let Some(peers) = self.pending_statements_peers.get(&hash) { if peers.contains(&who) { @@ -566,10 +566,10 @@ where } offset = current_end; log::trace!(target: LOG_TARGET, "Sent {} statements ({} KB) to {}, {} left", chunk.len(), encoded_size / 1024, who, to_send.len() - offset); - if let Some(ref metrics) = self.metrics { + self.metrics.as_ref().map(|metrics| { metrics.propagated_statements.inc_by(chunk.len() as u64); metrics.propagated_statements_chunks.observe(chunk.len() as f64); - } + }); break; } @@ -582,9 +582,9 @@ where if new_chunk_size == 0 { if chunk.len() == 1 { log::warn!(target: LOG_TARGET, "Statement too large ({} KB), skipping", encoded_size / 1024); - if let Some(ref metrics) = self.metrics { + self.metrics.as_ref().map(|metrics| { metrics.skipped_oversized_statements.inc(); - } + }); offset = current_end; break; } From f47af223b9c4111d4a8df56fcef82184fd80a095 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 14 Oct 2025 15:51:11 +0300 Subject: [PATCH 36/42] Handle very slow or disconnected peers --- substrate/client/network/statement/src/lib.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 4d9e7d30d79c9..7495f057ad941 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -59,6 +59,7 @@ use std::{ pin::Pin, sync::Arc, }; +use tokio::time::timeout; pub mod config; @@ -87,6 +88,8 @@ mod rep { } const LOG_TARGET: &str = "statement-gossip"; +/// Maximim time we wait for sending a notification to a peer. +const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); struct Metrics { propagated_statements: Counter, @@ -556,12 +559,14 @@ where // If chunk fits, send it if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize { - if let Err(e) = self - .notification_service - .send_async_notification(who, chunk.encode()) - .await + if let Err(e) = timeout( + SEND_TIMEOUT, + self.notification_service.send_async_notification(who, chunk.encode()), + ) + .await { - log::debug!(target: LOG_TARGET, "Failed to send notification to {}, peer disconnected: {:?}", who, e); + log::debug!(target: LOG_TARGET, "Failed to send notification to {}, peer disconnected, skipping further batches: {:?}", who, e); + offset = to_send.len(); break; } offset = current_end; From 894d00e09d55ae674380e95dd49768050028379c Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 15 Oct 2025 13:15:08 +0300 Subject: [PATCH 37/42] Update substrate/client/network/statement/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- substrate/client/network/statement/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 7495f057ad941..dbb4499986ecf 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -450,8 +450,7 @@ where if peers.contains(&who) { log::trace!( target: LOG_TARGET, - "Already received the statement from the same peer {}.", - who + "Already received the statement from the same peer {who}.", ); self.network.report_peer(who, rep::DUPLICATE_STATEMENT); } From 10a1fffa0814ddd20f3746b8db0fa4e267c13647 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Wed, 15 Oct 2025 13:53:02 +0300 Subject: [PATCH 38/42] Update --- substrate/client/network/statement/Cargo.toml | 2 +- substrate/client/network/statement/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index 9247d6a31843d..e2c1e4b5f9f19 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -29,7 +29,7 @@ sc-network-types = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } sp-statement-store = { workspace = true, default-features = true } +tokio = { workspace = true } [dev-dependencies] async-trait = { workspace = true } -tokio = { workspace = true } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index dbb4499986ecf..4b39492062b55 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -534,7 +534,7 @@ where log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who); // never send statements to light nodes - if matches!(peer.role, ObservedRole::Light) { + if peer.role.is_light() { log::trace!(target: LOG_TARGET, "{} is a light node, skipping propagation", who); continue } From fbaa07636a6ac38dbc5612efab9fd2cc6ef81bbc Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 16 Oct 2025 11:45:05 +0300 Subject: [PATCH 39/42] Fix name --- substrate/client/statement-store/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index 35921a36069b3..ff017ef7fa914 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -85,7 +85,8 @@ pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB /// The maximum size of a single statement in bytes. /// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec`. -pub const MAX_STATEMENT_SIZE: usize = sc_network_statement::config::MAX_STATEMENT_SIZE as usize - 1; +pub const MAX_STATEMENT_SIZE: usize = + sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1; const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); From a3d15d26c5cec28e20899128d00d648a9ecfab20 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 16 Oct 2025 12:04:12 +0300 Subject: [PATCH 40/42] Fix flaky test --- substrate/client/statement-store/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index 1c48dacfd5571..d53efd401971c 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -1259,8 +1259,8 @@ mod tests { let recent1 = store.take_recent_statements().unwrap(); let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip(); let expected1 = vec![statement0, statement1, statement2]; - assert_eq!(recent1_hashes, expected1.iter().map(|s| s.hash()).collect::>()); - assert_eq!(recent1_statements, expected1); + assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash()))); + assert!(expected1.iter().all(|s| recent1_statements.contains(s))); // Recent statements are cleared. let recent2 = store.take_recent_statements().unwrap(); @@ -1271,8 +1271,8 @@ mod tests { let recent3 = store.take_recent_statements().unwrap(); let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip(); let expected3 = vec![statement3]; - assert_eq!(recent3_hashes, expected3.iter().map(|s| s.hash()).collect::>()); - assert_eq!(recent3_statements, expected3); + assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash()))); + assert!(expected3.iter().all(|s| recent3_statements.contains(s))); // Recent statements are cleared, but statements remain in the store. assert_eq!(store.statements().unwrap().len(), 4); From 1ce8246bece9f073c0410d4513ea4d7235ebd4c1 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 16 Oct 2025 12:07:04 +0300 Subject: [PATCH 41/42] Add early return --- substrate/client/statement-store/src/lib.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index caf57d07e399f..d074af1ed315b 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -389,9 +389,7 @@ impl Index { current_time: u64, ) -> MaybeInserted { let statement_len = statement.data_len(); - if statement_len > validation.max_size as usize || - statement.encoded_size() > MAX_STATEMENT_SIZE - { + if statement_len > validation.max_size as usize { log::debug!( target: LOG_TARGET, "Ignored oversize message: {:?} ({} bytes)", @@ -896,6 +894,18 @@ impl StatementStore for Store { /// Submit a statement to the store. Validates the statement and returns validation result. fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult { let hash = statement.hash(); + let encoded_size = statement.encoded_size(); + if encoded_size > MAX_STATEMENT_SIZE { + log::debug!( + target: LOG_TARGET, + "Statement is too big for propogation: {:?} ({}/{} bytes)", + HexDisplay::from(&hash), + statement.encoded_size(), + MAX_STATEMENT_SIZE + ); + return SubmitResult::Ignored + } + match self.index.read().query(&hash) { IndexQuery::Expired => if !source.can_be_resubmitted() { From 633619f21ca3951b1b1d2f21386163ad0424f65c Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 16 Oct 2025 12:08:34 +0300 Subject: [PATCH 42/42] Update prdoc --- prdoc/pr_9965.prdoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prdoc/pr_9965.prdoc b/prdoc/pr_9965.prdoc index 979c1f3ede14a..7c40ff0a8399c 100644 --- a/prdoc/pr_9965.prdoc +++ b/prdoc/pr_9965.prdoc @@ -1,7 +1,7 @@ title: Limit the size of the statement for further gossiping doc: - audience: Node Dev - description: "Limits the size of statements that are further gossiped over the network to prevent skipping oversized messages. The limit is set to match the network protocol's `MAX_STATEMENT_SIZE` (256 KB), accounting for 1-byte vector length overhead because statements are sent as `Vec`." + description: "Limits the size of statements that are further gossiped over the network to prevent skipping oversized messages. The limit is set to match the network protocol's `MAX_STATEMENT_NOTIFICATION_SIZE` (1 MB), accounting for 1-byte vector length overhead because statements are sent as `Vec`." crates: - name: sc-network-statement bump: minor