11use crate :: errors:: BeaconChainError ;
22use crate :: { BeaconChainTypes , BeaconStore } ;
33use bls:: PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN ;
4+ use rayon:: prelude:: * ;
45use smallvec:: SmallVec ;
56use ssz:: { Decode , Encode } ;
67use ssz_derive:: { Decode , Encode } ;
78use std:: collections:: HashMap ;
89use std:: marker:: PhantomData ;
910use store:: { DBColumn , Error as StoreError , StoreItem , StoreOp } ;
11+ use tracing:: instrument;
1012use types:: { BeaconState , FixedBytesExtended , Hash256 , PublicKey , PublicKeyBytes } ;
1113
1214/// Provides a mapping of `validator_index -> validator_publickey`.
@@ -28,6 +30,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
2830 /// Create a new public key cache using the keys in `state.validators`.
2931 ///
3032 /// The new cache will be updated with the keys from `state` and immediately written to disk.
33+ #[ instrument( name = "validator_pubkey_cache_new" , skip_all) ]
3134 pub fn new (
3235 state : & BeaconState < T :: EthSpec > ,
3336 store : BeaconStore < T > ,
@@ -46,6 +49,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
4649 }
4750
4851 /// Load the pubkey cache from the given on-disk database.
52+ #[ instrument( name = "validator_pubkey_cache_load_from_store" , skip_all) ]
4953 pub fn load_from_store ( store : BeaconStore < T > ) -> Result < Self , BeaconChainError > {
5054 let mut pubkeys = vec ! [ ] ;
5155 let mut indices = HashMap :: new ( ) ;
@@ -77,6 +81,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
7781 /// Does not delete any keys from `self` if they don't appear in `state`.
7882 ///
7983 /// NOTE: The caller *must* commit the returned I/O batch as part of the block import process.
84+ #[ instrument( skip_all) ]
8085 pub fn import_new_pubkeys (
8186 & mut self ,
8287 state : & BeaconState < T :: EthSpec > ,
@@ -106,29 +111,58 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
106111 self . indices . reserve ( validator_keys. len ( ) ) ;
107112
108113 let mut store_ops = Vec :: with_capacity ( validator_keys. len ( ) ) ;
109- for pubkey_bytes in validator_keys {
110- let i = self . pubkeys . len ( ) ;
111114
112- if self . indices . contains_key ( & pubkey_bytes) {
113- return Err ( BeaconChainError :: DuplicateValidatorPublicKey ) ;
115+ let is_initial_import = self . pubkeys . is_empty ( ) ;
116+
117+ // Helper to insert a decompressed key
118+ let mut insert_key =
119+ |pubkey_bytes : PublicKeyBytes , pubkey : PublicKey | -> Result < ( ) , BeaconChainError > {
120+ let i = self . pubkeys . len ( ) ;
121+
122+ if self . indices . contains_key ( & pubkey_bytes) {
123+ return Err ( BeaconChainError :: DuplicateValidatorPublicKey ) ;
124+ }
125+
126+ // Stage the new validator key for writing to disk.
127+ // It will be committed atomically when the block that introduced it is written to disk.
128+ // Notably it is NOT written while the write lock on the cache is held.
129+ // See: https://github.com/sigp/lighthouse/issues/2327
130+ store_ops. push ( StoreOp :: KeyValueOp (
131+ DatabasePubkey :: from_pubkey ( & pubkey)
132+ . as_kv_store_op ( DatabasePubkey :: key_for_index ( i) ) ,
133+ ) ) ;
134+
135+ self . pubkeys . push ( pubkey) ;
136+ self . pubkey_bytes . push ( pubkey_bytes) ;
137+ self . indices . insert ( pubkey_bytes, i) ;
138+ Ok ( ( ) )
139+ } ;
140+
141+ if is_initial_import {
142+ // On first startup, decompress keys in parallel for better performance
143+ let validator_keys_vec: Vec < PublicKeyBytes > = validator_keys. collect ( ) ;
144+
145+ let decompressed: Vec < ( PublicKeyBytes , PublicKey ) > = validator_keys_vec
146+ . into_par_iter ( )
147+ . map ( |pubkey_bytes| {
148+ let pubkey = ( & pubkey_bytes)
149+ . try_into ( )
150+ . map_err ( BeaconChainError :: InvalidValidatorPubkeyBytes ) ?;
151+ Ok ( ( pubkey_bytes, pubkey) )
152+ } )
153+ . collect :: < Result < Vec < _ > , BeaconChainError > > ( ) ?;
154+
155+ for ( pubkey_bytes, pubkey) in decompressed {
156+ insert_key ( pubkey_bytes, pubkey) ?;
157+ }
158+ } else {
159+ // Sequential path for incremental updates
160+ for pubkey_bytes in validator_keys {
161+ let pubkey = ( & pubkey_bytes)
162+ . try_into ( )
163+ . map_err ( BeaconChainError :: InvalidValidatorPubkeyBytes ) ?;
164+ insert_key ( pubkey_bytes, pubkey) ?;
114165 }
115-
116- let pubkey = ( & pubkey_bytes)
117- . try_into ( )
118- . map_err ( BeaconChainError :: InvalidValidatorPubkeyBytes ) ?;
119-
120- // Stage the new validator key for writing to disk.
121- // It will be committed atomically when the block that introduced it is written to disk.
122- // Notably it is NOT written while the write lock on the cache is held.
123- // See: https://github.com/sigp/lighthouse/issues/2327
124- store_ops. push ( StoreOp :: KeyValueOp (
125- DatabasePubkey :: from_pubkey ( & pubkey)
126- . as_kv_store_op ( DatabasePubkey :: key_for_index ( i) ) ,
127- ) ) ;
128-
129- self . pubkeys . push ( pubkey) ;
130- self . pubkey_bytes . push ( pubkey_bytes) ;
131- self . indices . insert ( pubkey_bytes, i) ;
132166 }
133167
134168 Ok ( store_ops)
@@ -324,4 +358,39 @@ mod test {
324358 let cache = ValidatorPubkeyCache :: load_from_store ( store) . expect ( "should open cache" ) ;
325359 check_cache_get ( & cache, & keypairs[ ..] ) ;
326360 }
361+
362+ #[ test]
363+ fn parallel_import_maintains_order ( ) {
364+ // Test that parallel decompression on first startup maintains correct order and indices
365+ let ( state, keypairs) = get_state ( 100 ) ;
366+ let store = get_store ( ) ;
367+
368+ // Create cache from empty state (triggers parallel path)
369+ let cache: ValidatorPubkeyCache < T > =
370+ ValidatorPubkeyCache :: new ( & state, store) . expect ( "should create cache" ) ;
371+
372+ check_cache_get ( & cache, & keypairs[ ..] ) ;
373+ }
374+
375+ #[ test]
376+ fn incremental_import_maintains_order ( ) {
377+ // Test that incremental imports maintain correct order (triggers sequential path)
378+ let store = get_store ( ) ;
379+
380+ // Start with 50 validators
381+ let ( state1, keypairs1) = get_state ( 50 ) ;
382+ let mut cache =
383+ ValidatorPubkeyCache :: new ( & state1, store. clone ( ) ) . expect ( "should create cache" ) ;
384+ check_cache_get ( & cache, & keypairs1[ ..] ) ;
385+
386+ // Add 50 more validators
387+ let ( state2, keypairs2) = get_state ( 100 ) ;
388+ let ops = cache
389+ . import_new_pubkeys ( & state2)
390+ . expect ( "should import pubkeys" ) ;
391+ store. do_atomically_with_block_and_blobs_cache ( ops) . unwrap ( ) ;
392+
393+ // Verify all 100 validators are correctly indexed
394+ check_cache_get ( & cache, & keypairs2[ ..] ) ;
395+ }
327396}
0 commit comments