11use ethrex_common:: H256 ;
2+ use rayon:: iter:: { ParallelBridge , ParallelIterator } ;
23use rustc_hash:: FxHashMap ;
34use std:: sync:: Arc ;
45
@@ -11,19 +12,58 @@ struct TrieLayer {
1112 id : usize ,
1213}
1314
14- #[ derive( Clone , Debug , Default ) ]
15+ #[ derive( Clone , Debug ) ]
1516pub struct TrieLayerCache {
1617 /// Monotonically increasing ID for layers, starting at 1.
1718 /// TODO: this implementation panics on overflow
1819 last_id : usize ,
1920 layers : FxHashMap < H256 , Arc < TrieLayer > > ,
21+ /// Global bloom that accrues all layer blooms.
22+ ///
23+ /// The bloom filter is used to avoid looking up all layers when the given path doesn't exist in any
24+ /// layer, thus going directly to the database.
25+ ///
26+ /// In case a bloom filter insert or merge fails, we need to mark the bloom filter as poisoned
27+ /// so we never use it again, because if we don't we may be misled into believing a key is not present
28+ /// on a diff layer when it is (i.e. a false negative), leading to wrong executions.
29+ bloom : Option < qfilter:: Filter > ,
30+ }
31+
32+ impl Default for TrieLayerCache {
33+ fn default ( ) -> Self {
34+ // Try to create the bloom filter, if it fails use poison mode.
35+ let bloom = Self :: create_filter ( ) . ok ( ) ;
36+ Self {
37+ bloom,
38+ last_id : 0 ,
39+ layers : Default :: default ( ) ,
40+ }
41+ }
2042}
2143
2244impl TrieLayerCache {
45+ // TODO: tune this
46+ fn create_filter ( ) -> Result < qfilter:: Filter , qfilter:: Error > {
47+ qfilter:: Filter :: new_resizeable ( 100_000 , 100_000_000 , 0.02 )
48+ . inspect_err ( |e| tracing:: warn!( "could not create trie layering bloom filter {e}" ) )
49+ }
50+
2351 pub fn get ( & self , state_root : H256 , key : Nibbles ) -> Option < Vec < u8 > > {
52+ let key = key. as_ref ( ) ;
53+
54+ // Fast check to know if any layer may contains the given key.
55+ // We can only be certain it doesn't exist, but if it returns true it may or not exist (false positive).
56+ if let Some ( filter) = & self . bloom
57+ && !filter. contains ( key)
58+ {
59+ // TrieWrapper goes to db when returning None.
60+ return None ;
61+ }
62+
2463 let mut current_state_root = state_root;
64+
2565 while let Some ( layer) = self . layers . get ( & current_state_root) {
26- if let Some ( value) = layer. nodes . get ( key. as_ref ( ) ) {
66+ if let Some ( value) = layer. nodes . get ( key) {
2767 return Some ( value. clone ( ) ) ;
2868 }
2969 current_state_root = layer. parent ;
@@ -70,9 +110,20 @@ impl TrieLayerCache {
70110 return ;
71111 }
72112
113+ // add this new bloom to the global one.
114+ if let Some ( filter) = & mut self . bloom {
115+ for ( p, _) in & key_values {
116+ if let Err ( qfilter:: Error :: CapacityExceeded ) = filter. insert ( p. as_ref ( ) ) {
117+ tracing:: warn!( "TrieLayerCache: put_batch capacity exceeded" ) ;
118+ self . bloom = None ;
119+ break ;
120+ }
121+ }
122+ }
123+
73124 let nodes: FxHashMap < Vec < u8 > , Vec < u8 > > = key_values
74125 . into_iter ( )
75- . map ( |( path, node ) | ( path. into_vec ( ) , node ) )
126+ . map ( |( path, value ) | ( path. into_vec ( ) , value ) )
76127 . collect ( ) ;
77128
78129 self . last_id += 1 ;
@@ -84,6 +135,47 @@ impl TrieLayerCache {
84135 self . layers . insert ( state_root, Arc :: new ( entry) ) ;
85136 }
86137
138+ /// Rebuilds the global bloom filter accruing all current existing layers.
139+ pub fn rebuild_bloom ( & mut self ) {
140+ let mut blooms: Vec < _ > = self
141+ . layers
142+ . values ( )
143+ . par_bridge ( )
144+ . map ( |entry| {
145+ let Ok ( mut bloom) = Self :: create_filter ( ) else {
146+ tracing:: warn!( "TrieLayerCache: rebuild_bloom could not create filter" ) ;
147+ return None ;
148+ } ;
149+ for ( p, _) in entry. nodes . iter ( ) {
150+ if let Err ( qfilter:: Error :: CapacityExceeded ) = bloom. insert ( p) {
151+ tracing:: warn!( "TrieLayerCache: rebuild_bloom capacity exceeded" ) ;
152+ return None ;
153+ }
154+ }
155+ Some ( bloom)
156+ } )
157+ . collect ( ) ;
158+
159+ let Some ( mut ret) = blooms. pop ( ) . flatten ( ) else {
160+ tracing:: warn!( "TrieLayerCache: rebuild_bloom no valid bloom found" ) ;
161+ self . bloom = None ;
162+ return ;
163+ } ;
164+ for bloom in blooms. iter ( ) {
165+ let Some ( bloom) = bloom else {
166+ tracing:: warn!( "TrieLayerCache: rebuild_bloom no valid bloom found" ) ;
167+ self . bloom = None ;
168+ return ;
169+ } ;
170+ if let Err ( qfilter:: Error :: CapacityExceeded ) = ret. merge ( false , bloom) {
171+ tracing:: warn!( "TrieLayerCache: rebuild_bloom capacity exceeded" ) ;
172+ self . bloom = None ;
173+ return ;
174+ }
175+ }
176+ self . bloom = Some ( ret) ;
177+ }
178+
87179 pub fn commit ( & mut self , state_root : H256 ) -> Option < Vec < ( Vec < u8 > , Vec < u8 > ) > > {
88180 let layer = match Arc :: try_unwrap ( self . layers . remove ( & state_root) ?) {
89181 Ok ( layer) => layer,
@@ -93,6 +185,7 @@ impl TrieLayerCache {
93185 let parent_nodes = self . commit ( layer. parent ) ;
94186 // older layers are useless
95187 self . layers . retain ( |_, item| item. id > layer. id ) ;
188+ self . rebuild_bloom ( ) ; // layers removed, rebuild global bloom filter.
96189 Some (
97190 parent_nodes
98191 . unwrap_or_default ( )
0 commit comments