Skip to content

Commit b2011f6

Browse files
committed
perf(trie): parallelize HashedPostStateSorted::from_reverts
Parallelizes the hashing/sorting step using Rayon when account count exceeds a threshold (2500). This alleviates CPU bottlenecks during large state reverts or deep reorgs. Closes #20049
1 parent cc4cdc4 commit b2011f6

File tree

3 files changed

+156
-10
lines changed

3 files changed

+156
-10
lines changed

crates/trie/db/Cargo.toml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ alloy-primitives.workspace = true
2424
# tracing
2525
tracing.workspace = true
2626

27+
# rayon
28+
rayon = { workspace = true, optional = true }
29+
2730
[dev-dependencies]
2831
# reth
2932
reth-chainspec.workspace = true
@@ -48,7 +51,6 @@ serde_json.workspace = true
4851
similar-asserts.workspace = true
4952

5053
criterion.workspace = true
51-
rayon.workspace = true
5254
rand.workspace = true
5355

5456
[features]
@@ -62,6 +64,7 @@ serde = [
6264
"reth-primitives-traits/serde",
6365
"revm-database/serde",
6466
"revm/serde",
67+
"rand/serde",
6568
]
6669
test-utils = [
6770
"reth-trie-common/test-utils",
@@ -73,6 +76,13 @@ test-utils = [
7376
"reth-trie/test-utils",
7477
]
7578

79+
parallel-from-reverts = ["dep:rayon"]
80+
7681
[[bench]]
7782
name = "sorting_par_exp"
78-
harness = false
83+
harness = false
84+
required-features = ["parallel-from-reverts"]
85+
86+
[[bench]]
87+
name = "integration_bench"
88+
harness = false
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#![allow(missing_docs, unreachable_pub)]
2+
3+
use alloy_primitives::{Address, B256, U256};
4+
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
5+
use rand::{rngs::StdRng, Rng, SeedableRng};
6+
use reth_db::{
7+
test_utils::create_test_rw_db,
8+
transaction::{DbTx, DbTxMut},
9+
Database, DatabaseEnv,
10+
};
11+
use reth_db_api::{
12+
models::{AccountBeforeTx, BlockNumberAddress},
13+
tables,
14+
};
15+
use reth_primitives_traits::{Account, StorageEntry};
16+
use reth_trie::{HashedPostStateSorted, KeccakKeyHasher};
17+
use reth_trie_db::DatabaseHashedPostState;
18+
use std::sync::Arc;
19+
20+
// Populate a temporary database with synthetic revert data.
21+
type TestDb = Arc<reth_db::test_utils::TempDatabase<DatabaseEnv>>;
22+
23+
#[derive(Clone, Copy, Debug)]
24+
enum Distribution {
25+
Uniform(usize),
26+
Skewed { light: usize, heavy: usize },
27+
}
28+
29+
fn seed_db(num_accounts: usize, dist: Distribution) -> TestDb {
30+
let db = create_test_rw_db();
31+
let tx = db.tx_mut().expect("failed to create rw tx");
32+
33+
let mut rng = StdRng::seed_from_u64(12345);
34+
let block = 1;
35+
36+
for idx in 0..num_accounts {
37+
let address = Address::random();
38+
39+
let account =
40+
Account { nonce: idx as u64, balance: U256::from(idx as u64), bytecode_hash: None };
41+
42+
tx.put::<tables::AccountChangeSets>(
43+
block,
44+
AccountBeforeTx { address, info: Some(account) },
45+
)
46+
.expect("failed to insert account changeset");
47+
48+
let slots_count = match dist {
49+
Distribution::Uniform(n) => n,
50+
Distribution::Skewed { light, heavy } => {
51+
if rng.random_bool(0.05) {
52+
heavy
53+
} else {
54+
light
55+
}
56+
}
57+
};
58+
59+
for slot_idx in 0..slots_count {
60+
let slot_key = B256::random();
61+
let value = U256::from(slot_idx);
62+
63+
tx.put::<tables::StorageChangeSets>(
64+
BlockNumberAddress((block, address)),
65+
StorageEntry { key: slot_key, value },
66+
)
67+
.expect("failed to insert storage changeset");
68+
}
69+
}
70+
71+
tx.commit().expect("failed to commit seeded db");
72+
db
73+
}
74+
75+
fn bench_db_execution(c: &mut Criterion) {
76+
let mut group = c.benchmark_group("Integration_DB_Sort");
77+
group.measurement_time(std::time::Duration::from_secs(10));
78+
79+
// Scenarios to test:
80+
// - Below Threshold: Should be identical (Sequential)
81+
// - Above Threshold: Should show some speedup with feature flag
82+
// - Skewed: Should show speedup with feature flag
83+
let scenarios = vec![
84+
("Small_Under_Threshold", 1_000, Distribution::Uniform(50)),
85+
("Large_Uniform", 10_000, Distribution::Uniform(20)),
86+
("Large_Skewed", 10_000, Distribution::Skewed { light: 4, heavy: 2_000 }),
87+
];
88+
89+
for (name, accounts, dist) in scenarios {
90+
let db = seed_db(accounts, dist);
91+
let range = 1..=1;
92+
93+
group.bench_function(BenchmarkId::new("from_reverts", name), |b| {
94+
b.iter(|| {
95+
let tx = db.tx().expect("failed to create ro tx");
96+
97+
let state =
98+
HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(&tx, range.clone())
99+
.expect("failed to calculate state");
100+
101+
black_box(state);
102+
});
103+
});
104+
}
105+
106+
group.finish();
107+
}
108+
109+
criterion_group!(benches, bench_db_execution);
110+
criterion_main!(benches);

crates/trie/db/src/state.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use std::{
1919
};
2020
use tracing::{debug, instrument};
2121

22+
#[cfg(feature = "parallel-from-reverts")]
23+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
24+
2225
/// Extends [`StateRoot`] with operations specific for working with a database transaction.
2326
pub trait DatabaseStateRoot<'a, TX>: Sized {
2427
/// Create a new [`StateRoot`] instance.
@@ -280,14 +283,37 @@ impl<TX: DbTx> DatabaseHashedPostState<TX> for HashedPostStateSorted {
280283
}
281284
}
282285

283-
// Sort storage slots and convert to HashedStorageSorted
284-
let hashed_storages = storages
285-
.into_iter()
286-
.map(|(address, mut slots)| {
287-
slots.sort_unstable_by_key(|(slot, _)| *slot);
288-
(address, HashedStorageSorted { storage_slots: slots, wiped: false })
289-
})
290-
.collect();
286+
// Threshold based on benchmark
287+
const PARALLEL_THRESHOLD: usize = 2_500;
288+
289+
// Check Feature Flag AND Threshold
290+
let use_parallel =
291+
cfg!(feature = "parallel-from-reverts") && storages.len() >= PARALLEL_THRESHOLD;
292+
293+
let hashed_storages = if use_parallel {
294+
#[cfg(feature = "parallel-from-reverts")]
295+
{
296+
storages
297+
.into_par_iter()
298+
.map(|(address, mut slots)| {
299+
slots.sort_unstable_by_key(|(slot, _)| *slot);
300+
(address, HashedStorageSorted { storage_slots: slots, wiped: false })
301+
})
302+
.collect()
303+
}
304+
#[cfg(not(feature = "parallel-from-reverts"))]
305+
{
306+
unreachable!()
307+
}
308+
} else {
309+
storages
310+
.into_iter()
311+
.map(|(address, mut slots)| {
312+
slots.sort_unstable_by_key(|(slot, _)| *slot);
313+
(address, HashedStorageSorted { storage_slots: slots, wiped: false })
314+
})
315+
.collect()
316+
};
291317

292318
Ok(Self::new(accounts, hashed_storages))
293319
}

0 commit comments

Comments
 (0)