Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
## Perf

### 2025-11-03

- Switch to binary fuse filter for added performance on trie layers [#5159](https://github.com/lambdaclass/ethrex/pull/5159)

### 2025-10-31

- Merge execution with some post-execution validations [#5170](https://github.com/lambdaclass/ethrex/pull/5170)

### 2025-10-31
Expand Down
17 changes: 4 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 12 additions & 19 deletions crates/l2/tee/quote-gen/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rocksdb = { workspace = true, optional = true }
rustc-hash.workspace = true
tokio = { workspace = true, optional = true, features = ["rt"] }
bincode = "1.3.3"
qfilter = "0.2.5"
xorfilter-rs = "0.5.1"
rayon.workspace = true

[features]
Expand Down
96 changes: 40 additions & 56 deletions crates/storage/trie_db/layering.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use ethrex_common::H256;
use rayon::iter::{ParallelBridge, ParallelIterator};
use rustc_hash::FxHashMap;
use rayon::slice::ParallelSliceMut;
use rustc_hash::{FxBuildHasher, FxHashMap};
use std::hash::BuildHasher;
use std::sync::Arc;

use ethrex_trie::{Nibbles, TrieDB, TrieError};
Expand All @@ -10,9 +12,11 @@ struct TrieLayer {
nodes: Arc<FxHashMap<Vec<u8>, Vec<u8>>>,
parent: H256,
id: usize,
// pre-computed 64-bit digests to recreate the global bloom filter
bloom_digests: Arc<Vec<u64>>,
}

#[derive(Clone, Debug)]
#[derive(Clone, Default)]
pub struct TrieLayerCache {
/// Monotonically increasing ID for layers, starting at 1.
/// TODO: this implementation panics on overflow
Expand All @@ -26,26 +30,23 @@ pub struct TrieLayerCache {
/// In case a bloom filter insert or merge fails, we need to mark the bloom filter as poisoned
/// so we never use it again, because if we don't we may be misled into believing a key is not present
/// on a diff layer when it is (i.e. a false negative), leading to wrong executions.
bloom: Option<qfilter::Filter>,
bloom: Option<Arc<xorfilter::Fuse8<FxBuildHasher>>>,
}

impl Default for TrieLayerCache {
fn default() -> Self {
// Try to create the bloom filter, if it fails use poison mode.
let bloom = Self::create_filter().ok();
Self {
bloom,
last_id: 0,
layers: Default::default(),
}
impl std::fmt::Debug for TrieLayerCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TrieLayerCache")
.field("last_id", &self.last_id)
.field("layers", &self.layers)
// bloom doesn't implement Debug
.finish_non_exhaustive()
}
}

impl TrieLayerCache {
// TODO: tune this
fn create_filter() -> Result<qfilter::Filter, qfilter::Error> {
qfilter::Filter::new_resizeable(1_000_000, 100_000_000, 0.02)
.inspect_err(|e| tracing::warn!("could not create trie layering bloom filter {e}"))
fn create_filter() -> xorfilter::Fuse8<FxBuildHasher> {
xorfilter::Fuse8::new(1_000_000)
}

pub fn get(&self, state_root: H256, key: Nibbles) -> Option<Vec<u8>> {
Expand Down Expand Up @@ -110,70 +111,53 @@ impl TrieLayerCache {
return;
}

// add this new bloom to the global one.
if let Some(filter) = &mut self.bloom {
for (p, _) in &key_values {
if let Err(qfilter::Error::CapacityExceeded) = filter.insert(p.as_ref()) {
tracing::warn!("TrieLayerCache: put_batch capacity exceeded");
self.bloom = None;
break;
}
}
}

let nodes: FxHashMap<Vec<u8>, Vec<u8>> = key_values
.into_iter()
.map(|(path, value)| (path.into_vec(), value))
.collect();

self.last_id += 1;

let key_hashes: Vec<u64> = nodes
.keys()
.par_bridge()
.map(|key| FxBuildHasher.hash_one(key))
.collect();

let entry = TrieLayer {
nodes: Arc::new(nodes),
parent,
id: self.last_id,
bloom_digests: Arc::new(key_hashes),
};

self.layers.insert(state_root, Arc::new(entry));
// We need to rebuild the filter, with xorfilter we can't simply add the layer since it's static.
self.rebuild_bloom();
}

/// Rebuilds the global bloom filter accruing all current existing layers.
pub fn rebuild_bloom(&mut self) {
let mut blooms: Vec<_> = self
let mut bloom = Self::create_filter();

// Parallelize key hashing ourselves because populate from xorfilter doesn't.
let mut bloom_digests: Vec<u64> = self
.layers
.values()
.par_bridge()
.map(|entry| {
let Ok(mut bloom) = Self::create_filter() else {
tracing::warn!("TrieLayerCache: rebuild_bloom could not create filter");
return None;
};
for (p, _) in entry.nodes.iter() {
if let Err(qfilter::Error::CapacityExceeded) = bloom.insert(p) {
tracing::warn!("TrieLayerCache: rebuild_bloom capacity exceeded");
return None;
}
}
Some(bloom)
})
.flat_map(|x| x.bloom_digests.iter().copied())
.collect();

let Some(mut ret) = blooms.pop().flatten() else {
tracing::warn!("TrieLayerCache: rebuild_bloom no valid bloom found");
// xorfilter needs "few" or no unique keys, so we need to do this.
bloom_digests.par_sort_unstable();
bloom_digests.dedup();

if let Err(e) = bloom.build_keys(&bloom_digests) {
tracing::warn!("TrieLayerCache: rebuild_bloom error: {e}");
self.bloom = None;
return;
};
for bloom in blooms.iter() {
let Some(bloom) = bloom else {
tracing::warn!("TrieLayerCache: rebuild_bloom no valid bloom found");
self.bloom = None;
return;
};
if let Err(qfilter::Error::CapacityExceeded) = ret.merge(false, bloom) {
tracing::warn!("TrieLayerCache: rebuild_bloom capacity exceeded");
self.bloom = None;
return;
}
}
self.bloom = Some(ret);

self.bloom = Some(Arc::new(bloom));
}

pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
Expand Down
Loading