Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/trie/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ alloy-primitives.workspace = true
# tracing
tracing.workspace = true

# rayon
rayon = { workspace = true, optional = true }

[dev-dependencies]
# reth
reth-chainspec.workspace = true
Expand All @@ -47,6 +50,9 @@ proptest-arbitrary-interop.workspace = true
serde_json.workspace = true
similar-asserts.workspace = true

criterion.workspace = true
rand.workspace = true

[features]
metrics = ["reth-trie/metrics"]
serde = [
Expand All @@ -58,6 +64,7 @@ serde = [
"reth-primitives-traits/serde",
"revm-database/serde",
"revm/serde",
"rand/serde",
]
test-utils = [
"reth-trie-common/test-utils",
Expand All @@ -68,3 +75,14 @@ test-utils = [
"reth-provider/test-utils",
"reth-trie/test-utils",
]

parallel-from-reverts = ["dep:rayon"]

[[bench]]
name = "sorting_par_exp"
harness = false
required-features = ["parallel-from-reverts"]

[[bench]]
name = "integration_bench"
harness = false
110 changes: 110 additions & 0 deletions crates/trie/db/benches/integration_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#![allow(missing_docs, unreachable_pub)]

use alloy_primitives::{Address, B256, U256};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use rand::{rngs::StdRng, Rng, SeedableRng};
use reth_db::{
test_utils::create_test_rw_db,
transaction::{DbTx, DbTxMut},
Database, DatabaseEnv,
};
use reth_db_api::{
models::{AccountBeforeTx, BlockNumberAddress},
tables,
};
use reth_primitives_traits::{Account, StorageEntry};
use reth_trie::{HashedPostStateSorted, KeccakKeyHasher};
use reth_trie_db::DatabaseHashedPostState;
use std::sync::Arc;

// Populate a temporary database with synthetic revert data.
type TestDb = Arc<reth_db::test_utils::TempDatabase<DatabaseEnv>>;

#[derive(Clone, Copy, Debug)]
enum Distribution {
Uniform(usize),
Skewed { light: usize, heavy: usize },
}

fn seed_db(num_accounts: usize, dist: Distribution) -> TestDb {
let db = create_test_rw_db();
let tx = db.tx_mut().expect("failed to create rw tx");

let mut rng = StdRng::seed_from_u64(12345);
let block = 1;

for idx in 0..num_accounts {
let address = Address::random();

let account =
Account { nonce: idx as u64, balance: U256::from(idx as u64), bytecode_hash: None };

tx.put::<tables::AccountChangeSets>(
block,
AccountBeforeTx { address, info: Some(account) },
)
.expect("failed to insert account changeset");

let slots_count = match dist {
Distribution::Uniform(n) => n,
Distribution::Skewed { light, heavy } => {
if rng.random_bool(0.05) {
heavy
} else {
light
}
}
};

for slot_idx in 0..slots_count {
let slot_key = B256::random();
let value = U256::from(slot_idx);

tx.put::<tables::StorageChangeSets>(
BlockNumberAddress((block, address)),
StorageEntry { key: slot_key, value },
)
.expect("failed to insert storage changeset");
}
}

tx.commit().expect("failed to commit seeded db");
db
}

fn bench_db_execution(c: &mut Criterion) {
let mut group = c.benchmark_group("Integration_DB_Sort");
group.measurement_time(std::time::Duration::from_secs(10));

// Scenarios to test:
// - Below Threshold: Should be identical (Sequential)
// - Above Threshold: Should show some speedup with feature flag
// - Skewed: Should show speedup with feature flag
let scenarios = vec![
("Small_Under_Threshold", 1_000, Distribution::Uniform(50)),
("Large_Uniform", 10_000, Distribution::Uniform(20)),
("Large_Skewed", 10_000, Distribution::Skewed { light: 4, heavy: 2_000 }),
];

for (name, accounts, dist) in scenarios {
let db = seed_db(accounts, dist);
let range = 1..=1;

group.bench_function(BenchmarkId::new("from_reverts", name), |b| {
b.iter(|| {
let tx = db.tx().expect("failed to create ro tx");

let state =
HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(&tx, range.clone())
.expect("failed to calculate state");

black_box(state);
});
});
}

group.finish();
}

criterion_group!(benches, bench_db_execution);
criterion_main!(benches);
113 changes: 113 additions & 0 deletions crates/trie/db/benches/sorting_par_exp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#![allow(missing_docs, unreachable_pub)]

use alloy_primitives::{Address, B256, U256};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rayon::prelude::*;

/// Synthetic storage data: a list of accounts, each with a list of slots.
type MockStorage = Vec<(Address, Vec<(B256, U256)>)>;

#[derive(Clone, Copy, Debug)]
enum Distribution {
Uniform(usize),
Skewed { light: usize, heavy: usize },
}

/// Generate random synthetic data with a fixed seed for reproducibility.
fn generate_data(num_accounts: usize, dist: Distribution) -> MockStorage {
let mut rng = StdRng::seed_from_u64(42);

(0..num_accounts)
.map(|_| {
let address = Address::random();
let slot_count = match dist {
Distribution::Uniform(n) => n,
Distribution::Skewed { light, heavy } => {
// 5% chance of being a heavy account
if rng.random_bool(0.05) {
heavy
} else {
light
}
}
};

let slots = (0..slot_count)
.map(|_| (B256::random(), U256::from(rng.random::<u64>())))
.collect();
(address, slots)
})
.collect()
}

/// Loop sequentially, sort slots sequentially.
fn process_sequential(mut data: MockStorage) {
for (_addr, slots) in &mut data {
slots.sort_unstable_by_key(|(k, _)| *k);
}
}

/// Accounts in parallel, sort slots sequentially.
fn process_par_iter_accounts(mut data: MockStorage) {
data.par_iter_mut().for_each(|(_addr, slots)| {
slots.sort_unstable_by_key(|(k, _)| *k);
});
}

/// Accounts sequentially, sort slots parallel.
fn process_par_sort_slots(mut data: MockStorage) {
for (_addr, slots) in &mut data {
slots.par_sort_unstable_by_key(|(k, _)| *k);
}
}

fn bench_storage_sort(c: &mut Criterion) {
let mut group = c.benchmark_group("sorting_par_exp");

let scenarios = vec![
// Finding Account Threshold.
("Acc_Low", 100, Distribution::Uniform(4)),
("Acc_Med", 1_000, Distribution::Uniform(4)),
("Acc_Med_High", 2_500, Distribution::Uniform(4)),
("Acc_High", 10_000, Distribution::Uniform(4)),
// Finding Slot Threshold.
("Slots_Med", 10, Distribution::Uniform(100)),
("Slots_High", 10, Distribution::Uniform(5_000)),
("Slots_Massive", 10, Distribution::Uniform(50_000)),
// 10k accounts. Most have 4 slots, 5% have 2k slots.
("Skewed_2.5k", 2_500, Distribution::Skewed { light: 4, heavy: 2_000 }),
("Skewed_10k", 10_000, Distribution::Skewed { light: 4, heavy: 2_000 }),
];

for (name, accounts, dist) in scenarios {
let input = generate_data(accounts, dist);

let total_slots: usize = input.iter().map(|(_, s)| s.len()).sum();
group.throughput(Throughput::Elements(total_slots as u64));

// Sequential
group.bench_with_input(BenchmarkId::new("Sequential", name), &input, |b, data| {
b.iter_batched(|| data.clone(), process_sequential, BatchSize::LargeInput)
});

// Parallel Accounts
group.bench_with_input(BenchmarkId::new("Par_Accounts", name), &input, |b, data| {
b.iter_batched(|| data.clone(), process_par_iter_accounts, BatchSize::LargeInput)
});

// Parallel Slots
if let Distribution::Uniform(s) = dist &&
s >= 100
{
group.bench_with_input(BenchmarkId::new("Par_Inner_Slots", name), &input, |b, data| {
b.iter_batched(|| data.clone(), process_par_sort_slots, BatchSize::LargeInput)
});
}
}

group.finish();
}

criterion_group!(sorting_par_exp, bench_storage_sort);
criterion_main!(sorting_par_exp);
42 changes: 34 additions & 8 deletions crates/trie/db/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use std::{
};
use tracing::{debug, instrument};

#[cfg(feature = "parallel-from-reverts")]
use rayon::iter::{IntoParallelIterator, ParallelIterator};

/// Extends [`StateRoot`] with operations specific for working with a database transaction.
pub trait DatabaseStateRoot<'a, TX>: Sized {
/// Create a new [`StateRoot`] instance.
Expand Down Expand Up @@ -280,14 +283,37 @@ impl<TX: DbTx> DatabaseHashedPostState<TX> for HashedPostStateSorted {
}
}

// Sort storage slots and convert to HashedStorageSorted
let hashed_storages = storages
.into_iter()
.map(|(address, mut slots)| {
slots.sort_unstable_by_key(|(slot, _)| *slot);
(address, HashedStorageSorted { storage_slots: slots, wiped: false })
})
.collect();
// Threshold based on benchmark
const PARALLEL_THRESHOLD: usize = 2_500;

// Check Feature Flag AND Threshold
let use_parallel =
cfg!(feature = "parallel-from-reverts") && storages.len() >= PARALLEL_THRESHOLD;

let hashed_storages = if use_parallel {
#[cfg(feature = "parallel-from-reverts")]
{
storages
.into_par_iter()
.map(|(address, mut slots)| {
slots.sort_unstable_by_key(|(slot, _)| *slot);
(address, HashedStorageSorted { storage_slots: slots, wiped: false })
})
.collect()
}
#[cfg(not(feature = "parallel-from-reverts"))]
{
unreachable!()
}
} else {
storages
.into_iter()
.map(|(address, mut slots)| {
slots.sort_unstable_by_key(|(slot, _)| *slot);
(address, HashedStorageSorted { storage_slots: slots, wiped: false })
})
.collect()
};

Ok(Self::new(accounts, hashed_storages))
}
Expand Down
Loading