Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f06dab8
Gossip only recent statements
AndreiEres Oct 2, 2025
eca57b4
Check if the store already has a statement before processing
AndreiEres Oct 2, 2025
7b8aeb1
Reduce notification size if too large
AndreiEres Oct 2, 2025
3e69999
Update metrics
AndreiEres Oct 2, 2025
9a5decc
Update from github-actions[bot] running command 'prdoc --audience nod…
github-actions[bot] Oct 2, 2025
baeba9f
Update PRDOC
AndreiEres Oct 2, 2025
b0607bc
Update semver
AndreiEres Oct 2, 2025
52fb0aa
Address review comments
AndreiEres Oct 3, 2025
f08c2a8
Fix types
AndreiEres Oct 3, 2025
1e3e83b
Fix up
AndreiEres Oct 3, 2025
8cdb31a
Fix up
AndreiEres Oct 6, 2025
a317299
Fix typo
AndreiEres Oct 7, 2025
50ecef4
Remove unused var
AndreiEres Oct 7, 2025
6b89e1c
Update validation queue size
AndreiEres Oct 7, 2025
a705c00
Add mocked handler for tests
AndreiEres Oct 7, 2025
8703601
Add dirty test
AndreiEres Oct 7, 2025
90c0799
Add more tests
AndreiEres Oct 7, 2025
bd3645e
Fix duplicate processing
AndreiEres Oct 7, 2025
310e05b
Add more tests
AndreiEres Oct 7, 2025
ed55be3
Add metrics for ignored statements
AndreiEres Oct 7, 2025
514ad77
Remove unused tokio
AndreiEres Oct 7, 2025
0adf169
Fix tests
AndreiEres Oct 7, 2025
6daaee0
Fix style
AndreiEres Oct 8, 2025
0012b9e
Fix skipping very large statements
AndreiEres Oct 8, 2025
6cf91c4
Update logging and metrics
AndreiEres Oct 8, 2025
6891b17
Increase notification size
AndreiEres Oct 9, 2025
f52598f
Send async notifications
AndreiEres Oct 9, 2025
5945066
Fix tests
AndreiEres Oct 9, 2025
b5b96eb
Add more logs
AndreiEres Oct 14, 2025
971fe1e
Modify metrics setting
AndreiEres Oct 14, 2025
f47af22
Handle very slow or disconnected peers
AndreiEres Oct 14, 2025
dec3b53
Merge branch 'master' into AndreiEres/fix-statement-store-gossiping
AndreiEres Oct 15, 2025
894d00e
Update substrate/client/network/statement/src/lib.rs
AndreiEres Oct 15, 2025
10a1fff
Update
AndreiEres Oct 15, 2025
b8a1c2f
Merge branch 'master' into AndreiEres/fix-statement-store-gossiping
AndreiEres Oct 16, 2025
a3d15d2
Fix flaky test
AndreiEres Oct 16, 2025
3e17ffb
Merge branch 'master' into AndreiEres/fix-statement-store-gossiping
AndreiEres Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions prdoc/pr_9912.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
title: '[WIP] Fix statement-store gossiping'
doc:
- audience: Node Dev
description: |-
Fixes gossiping and scalability issues in the statement-store networking:
reduces traffic by propagating only recent statements, skips duplicate processing,
and splits large batches to stay under MAX_STATEMENT_NOTIFICATION_SIZE.
crates:
- name: sc-network-statement
bump: minor
- name: sc-statement-store
bump: minor
- name: sp-statement-store
bump: minor
4 changes: 2 additions & 2 deletions substrate/client/network/statement/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use std::time;
pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(1000);

/// Maximum number of known statement hashes to keep for a peer.
pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024;
pub(crate) const MAX_KNOWN_STATEMENTS: usize = 4 * 1024 * 1024; // * 32 bytes for hash = 128 MB per peer

/// Maximum allowed size for a statement notification.
pub(crate) const MAX_STATEMENT_SIZE: u64 = 256 * 1024;
pub(crate) const MAX_STATEMENT_NOTIFICATION_SIZE: u64 = 256 * 1024;

/// Maximum number of statement validation request we keep at any moment.
pub(crate) const MAX_PENDING_STATEMENTS: usize = 2 * 1024 * 1024;
104 changes: 98 additions & 6 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use crate::config::*;

use codec::{Decode, Encode};
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use prometheus_endpoint::{
prometheus, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64,
};
use sc_network::{
config::{NonReservedPeerMode, SetConfig},
error, multiaddr,
Expand Down Expand Up @@ -88,6 +90,10 @@ const LOG_TARGET: &str = "statement-gossip";

struct Metrics {
propagated_statements: Counter<U64>,
known_statements_received: Counter<U64>,
skipped_oversized_statements: Counter<U64>,
propagated_statements_chunks: Histogram,
pending_statements: Gauge<U64>,
}

impl Metrics {
Expand All @@ -100,6 +106,36 @@ impl Metrics {
)?,
r,
)?,
known_statements_received: register(
Counter::new(
"substrate_sync_known_statement_received",
"Number of statements received via gossiping that were already in the statement store",
)?,
r,
)?,
skipped_oversized_statements: register(
Counter::new(
"substrate_sync_skipped_oversized_statements",
"Number of oversized statements that were skipped to be gossiped",
)?,
r,
)?,
propagated_statements_chunks: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_propagated_statements_chunks",
"Distribution of chunk sizes when propagating statements",
).buckets(prometheus::exponential_buckets(1.0, 2.0, 14)?),
)?,
r,
)?,
pending_statements: register(
Gauge::new(
"substrate_sync_pending_statement_validations",
"Number of pending statement validations",
)?,
r,
)?,
})
}
}
Expand Down Expand Up @@ -131,7 +167,7 @@ impl StatementHandlerPrototype {
let (config, notification_service) = Net::notification_config(
protocol_name.clone().into(),
Vec::new(),
MAX_STATEMENT_SIZE,
MAX_STATEMENT_NOTIFICATION_SIZE,
None,
SetConfig {
in_peers: 0,
Expand Down Expand Up @@ -264,6 +300,9 @@ where
futures::select! {
_ = self.propagate_timeout.next() => {
self.propagate_statements();
if let Some(ref metrics) = self.metrics {
metrics.pending_statements.set(self.pending_statements.len() as u64);
}
},
(hash, result) = self.pending_statements.select_next_some() => {
if let Some(peers) = self.pending_statements_peers.remove(&hash) {
Expand Down Expand Up @@ -388,6 +427,13 @@ where

self.network.report_peer(who, rep::ANY_STATEMENT);

if self.statement_store.has_statement(&hash) {
if let Some(ref metrics) = self.metrics {
metrics.known_statements_received.inc();
}
continue;
}

match self.pending_statements_peers.entry(hash) {
Entry::Vacant(entry) => {
let (completion_sender, completion_receiver) = oneshot::channel();
Expand Down Expand Up @@ -470,9 +516,55 @@ where

propagated_statements += to_send.len();

if !to_send.is_empty() {
log::trace!(target: LOG_TARGET, "Sending {} statements to {}", to_send.len(), who);
self.notification_service.send_sync_notification(who, to_send.encode());
let mut offset = 0;
while offset < to_send.len() {
Copy link
Contributor

@lexnv lexnv Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we still need to tackle this with a FuturesUnordered here. If we have multiple statements, we'll fill the 1 MiB (MAX_STATEMENT_NOTIFICATION_SIZE) limit quickly. Then, each peer will become a bottleneck for the other peers that need statements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put in FuturesUnordered only cycles to one peer, not every batch, because if the peer disconnects, we don't go further. For current use we have about three peers, so we can postpone this improvement.

// Try to send as many statements as possible in one notification
let chunk_size = to_send.len() - offset;
let mut current_end = to_send.len();

loop {
let chunk = &to_send[offset..current_end];
let encoded_size = chunk.encoded_size();

// If chunk fits, send it
if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize {
log::trace!(
target: LOG_TARGET,
"Sending {} statements ({} KB) to {}",
chunk.len(),
encoded_size / 1024,
who
);
self.notification_service.send_sync_notification(who, chunk.encode());
offset = current_end;
if let Some(ref metrics) = self.metrics {
metrics.propagated_statements_chunks.observe(chunk.len() as f64);
}
break;
}

// Size exceeded - split the chunk
let split_factor =
(encoded.len() / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1;
let new_chunk_size = (current_end - offset) / split_factor;

// Single statement is too large
if new_chunk_size == 0 {
log::warn!(
target: LOG_TARGET,
"Statement too large ({} KB), skipping",
encoded.len() / 1024
);
if let Some(ref metrics) = self.metrics {
metrics.skipped_oversized_statements.inc();
}
offset = current_end;
break;
}

// Reduce chunk size and try again
current_end = offset + new_chunk_size;
}
}
}

Expand All @@ -489,7 +581,7 @@ where
}

log::debug!(target: LOG_TARGET, "Propagating statements");
if let Ok(statements) = self.statement_store.statements() {
if let Ok(statements) = self.statement_store.take_recent_statements() {
self.do_propagate_statements(&statements);
}
}
Expand Down
67 changes: 64 additions & 3 deletions substrate/client/statement-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl Default for Options {

#[derive(Default)]
struct Index {
recent: HashSet<Hash>,
by_topic: HashMap<Topic, HashSet<Hash>>,
by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
Expand Down Expand Up @@ -243,6 +244,7 @@ impl Index {
}
let priority = Priority(statement.priority().unwrap_or(0));
self.entries.insert(hash, (account, priority, statement.data_len()));
self.recent.insert(hash);
self.total_size += statement.data_len();
let account_info = self.accounts.entry(account).or_default();
account_info.data_size += statement.data_len();
Expand Down Expand Up @@ -324,6 +326,10 @@ impl Index {
purged
}

fn take_recent(&mut self) -> HashSet<Hash> {
std::mem::take(&mut self.recent)
}

fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
if let Some((account, priority, len)) = self.entries.remove(hash) {
self.total_size -= len;
Expand All @@ -347,6 +353,7 @@ impl Index {
}
}
}
let _ = self.recent.remove(hash);
self.expired.insert(*hash, current_time);
if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
self.accounts.entry(account)
Expand Down Expand Up @@ -767,11 +774,27 @@ impl StatementStore for Store {
fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
let index = self.index.read();
let mut result = Vec::with_capacity(index.entries.len());
for h in index.entries.keys() {
let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?;
for hash in index.entries.keys().cloned() {
let encoded =
self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?;
if let Some(encoded) = encoded {
if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
result.push((hash, statement));
}
}
}
Ok(result)
}

fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
let mut index = self.index.write();
let recent = index.take_recent();
let mut result = Vec::with_capacity(recent.len());
for hash in recent {
let encoded =
self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?;
if let Some(encoded) = encoded {
if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
let hash = statement.hash();
result.push((hash, statement));
}
}
Expand Down Expand Up @@ -810,6 +833,10 @@ impl StatementStore for Store {
)
}

fn has_statement(&self, hash: &Hash) -> bool {
self.index.read().entries.contains_key(hash)
}

/// Return the data of all known statements which include all topics and have no `DecryptionKey`
/// field.
fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
Expand Down Expand Up @@ -1214,6 +1241,40 @@ mod tests {
assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
}

#[test]
fn take_recent_statements_clears_index() {
let (store, _temp) = test_store();
let statement0 = signed_statement(0);
let statement1 = signed_statement(1);
let statement2 = signed_statement(2);
let statement3 = signed_statement(3);

let _ = store.submit(statement0.clone(), StatementSource::Local);
let _ = store.submit(statement1.clone(), StatementSource::Local);
let _ = store.submit(statement2.clone(), StatementSource::Local);

let recent1 = store.take_recent_statements().unwrap();
let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
let expected1 = vec![statement0, statement1, statement2];
assert_eq!(recent1_hashes, expected1.iter().map(|s| s.hash()).collect::<Vec<_>>());
assert_eq!(recent1_statements, expected1);

// Recent statements are cleared.
let recent2 = store.take_recent_statements().unwrap();
assert_eq!(recent2.len(), 0);

store.submit(statement3.clone(), StatementSource::Network);

let recent3 = store.take_recent_statements().unwrap();
let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
let expected3 = vec![statement3];
assert_eq!(recent3_hashes, expected3.iter().map(|s| s.hash()).collect::<Vec<_>>());
assert_eq!(recent3_statements, expected3);

// Recent statements are cleared, but statements remain in the store.
assert_eq!(store.statements().unwrap().len(), 4);
}

#[test]
fn search_by_topic_and_key() {
let (store, _temp) = test_store();
Expand Down
11 changes: 11 additions & 0 deletions substrate/primitives/statement-store/src/store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,20 @@ pub trait StatementStore: Send + Sync {
/// Return all statements.
fn statements(&self) -> Result<Vec<(Hash, Statement)>>;

/// Return recent statements and clear the internal index.
///
/// This consumes and clears the recently received statements,
/// allowing new statements to be collected from this point forward.
fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>>;

/// Get statement by hash.
fn statement(&self, hash: &Hash) -> Result<Option<Statement>>;

/// Check if statement exists in the store
///
/// Fast index check without accessing the DB.
fn has_statement(&self, hash: &Hash) -> bool;

/// Return the data of all known statements which include all topics and have no `DecryptionKey`
/// field.
fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
Expand Down
Loading