diff --git a/CHANGELOG.md b/CHANGELOG.md index d439fbe30bd..3425c1fd616 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Perf +### 2025-11-04 + +-Avoid unnecesary work on bloom by keeping a per layer bloom [#5176](https://github.com/lambdaclass/ethrex/pull/5176) + ### 2025-11-03 - Avoid unnecessary hash validations [#5167](https://github.com/lambdaclass/ethrex/pull/5167) diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs index ae52f3b7294..54e5a59d0b9 100644 --- a/crates/storage/trie_db/layering.rs +++ b/crates/storage/trie_db/layering.rs @@ -1,5 +1,4 @@ use ethrex_common::H256; -use rayon::iter::{ParallelBridge, ParallelIterator}; use rustc_hash::FxHashMap; use std::sync::Arc; @@ -10,6 +9,10 @@ struct TrieLayer { nodes: Arc, Vec>>, parent: H256, id: usize, + /// Per layer bloom filter, None if the size was exceeded (exceedingly rare). + /// Having a bloom per layer avoids the cost of rehashing each key every time we rebuild the global bloom, + /// since merge simply uses the u64 hashed keys instead of rehashing. + bloom: Option, } #[derive(Clone, Debug)] @@ -110,17 +113,29 @@ impl TrieLayerCache { return; } - // add this new bloom to the global one. - if let Some(filter) = &mut self.bloom { + let mut bloom = Self::create_filter().ok(); + + // create the layer bloom, this is the only place where hashing of keys happens. + if let Some(filter) = &mut bloom { for (p, _) in &key_values { if let Err(qfilter::Error::CapacityExceeded) = filter.insert(p.as_ref()) { - tracing::warn!("TrieLayerCache: put_batch capacity exceeded"); - self.bloom = None; + tracing::warn!("TrieLayerCache: put_batch per layer capacity exceeded"); + bloom = None; break; } } } + // add this new bloom to the global one via merge + if let Some(filter) = &mut self.bloom + && let Some(new_filter) = &bloom + && let Err(qfilter::Error::CapacityExceeded) = filter.merge(false, new_filter) + { + tracing::warn!("TrieLayerCache: put_batch merge capacity exceeded"); + self.bloom = None; + bloom = None; + } + let nodes: FxHashMap, Vec> = key_values .into_iter() .map(|(path, value)| (path.into_vec(), value)) @@ -131,37 +146,22 @@ impl TrieLayerCache { nodes: Arc::new(nodes), parent, id: self.last_id, + bloom, }; self.layers.insert(state_root, Arc::new(entry)); } /// Rebuilds the global bloom filter accruing all current existing layers. pub fn rebuild_bloom(&mut self) { - let mut blooms: Vec<_> = self - .layers - .values() - .par_bridge() - .map(|entry| { - let Ok(mut bloom) = Self::create_filter() else { - tracing::warn!("TrieLayerCache: rebuild_bloom could not create filter"); - return None; - }; - for (p, _) in entry.nodes.iter() { - if let Err(qfilter::Error::CapacityExceeded) = bloom.insert(p) { - tracing::warn!("TrieLayerCache: rebuild_bloom capacity exceeded"); - return None; - } - } - Some(bloom) - }) - .collect(); + let mut blooms = self.layers.values().map(|x| x.bloom.as_ref()); - let Some(mut ret) = blooms.pop().flatten() else { + let Some(mut ret) = blooms.next().flatten().cloned() else { tracing::warn!("TrieLayerCache: rebuild_bloom no valid bloom found"); self.bloom = None; return; }; - for bloom in blooms.iter() { + + for bloom in blooms { let Some(bloom) = bloom else { tracing::warn!("TrieLayerCache: rebuild_bloom no valid bloom found"); self.bloom = None;