@@ -20,14 +20,12 @@ use crate::{
2020 Clock ,
2121} ;
2222
23- #[ derive( Default , Debug , Eq , PartialEq , Serialize , Deserialize ) ]
24- #[ repr( u8 ) ]
23+ #[ derive( Debug ) ]
2524enum SchemaVersion {
26- /// Version 0, all the blobs, certificates, confirmed blocks, events and network description on the same partition.
27- /// This is marked as default since it does not exist in the old scheme and would be obtained from `unwrap_or_default`.
28- #[ default]
25+ /// Version 0. All the blobs, certificates, confirmed blocks, events and network
26+ /// description are on the same partition.
2927 Version0 ,
30- /// Version 1, spreading by chain ID, crypto hash, and blob ID.
28+ /// Version 1. New partitions are assigned by chain ID, crypto hash, and blob ID.
3129 Version1 ,
3230}
3331
@@ -41,11 +39,11 @@ const UNUSED_EMPTY_KEY: &[u8] = &[];
4139const MOVABLE_KEYS_0_1 : & [ u8 ] = & [ 1 , 2 , 3 , 4 , 5 , 7 ] ;
4240
4341/// The total number of keys being migrated in a block.
44- /// we use chunks to avoid OOM
42+ /// We use chunks to avoid OOM.
4543const BLOCK_KEY_SIZE : usize = 90 ;
4644
4745#[ derive( Debug , Serialize , Deserialize ) ]
48- pub ( crate ) enum BaseKey {
46+ enum BaseKey {
4947 ChainState ( ChainId ) ,
5048 Certificate ( CryptoHash ) ,
5149 ConfirmedBlock ( CryptoHash ) ,
@@ -110,12 +108,12 @@ where
110108 first_byte : & u8 ,
111109 keys : Vec < Vec < u8 > > ,
112110 ) -> Result < ( ) , ViewError > {
113- tracing:: debug !(
114- "migrate_shared_partition for {} keys starting with {first_byte}" ,
111+ tracing:: info !(
112+ "Migrating {} keys of shared DB partition starting with {first_byte}" ,
115113 keys. len( )
116114 ) ;
117115 for ( index, chunk_keys) in keys. chunks ( BLOCK_KEY_SIZE ) . enumerate ( ) {
118- tracing:: info!( "processing chunk {index} of size {}" , chunk_keys. len( ) ) ;
116+ tracing:: info!( "Processing chunk {index} of size {}" , chunk_keys. len( ) ) ;
119117 let chunk_base_keys = chunk_keys
120118 . iter ( )
121119 . map ( |key| {
@@ -140,6 +138,7 @@ where
140138 for key in chunk_base_keys {
141139 batch. delete_key ( key. to_vec ( ) ) ;
142140 }
141+ // Migrate chunk.
143142 store. write_batch ( batch) . await ?;
144143 }
145144 Ok ( ( ) )
@@ -155,40 +154,35 @@ where
155154 }
156155
157156 pub async fn migrate_if_needed ( & self ) -> Result < ( ) , ViewError > {
158- let ( is_initialized, schema) = self . get_storage_state ( ) . await ?;
159- if is_initialized && schema == SchemaVersion :: Version0 {
160- self . migrate_v0_to_v1 ( ) . await ?;
157+ match self . get_storage_state ( ) . await ? {
158+ Some ( SchemaVersion :: Version0 ) => self . migrate_v0_to_v1 ( ) . await ,
159+ Some ( SchemaVersion :: Version1 ) => Ok ( ( ) ) ,
160+ None => {
161+ // Need to initialize the DB, not migrate it.
162+ Ok ( ( ) )
163+ }
161164 }
162- Ok ( ( ) )
163165 }
164166
165- async fn get_storage_state ( & self ) -> Result < ( bool , SchemaVersion ) , ViewError > {
166- let test_old_schema = {
167- let store = self . database . open_shared ( & [ ] ) ?;
168- let key = bcs:: to_bytes ( & BaseKey :: NetworkDescription ) . unwrap ( ) ;
169- store. contains_key ( & key) . await ?
170- } ;
171- let test_new_schema = {
172- let root_key = RootKey :: NetworkDescription . bytes ( ) ;
173- let store = self . database . open_shared ( & root_key) ?;
174- store. contains_key ( NETWORK_DESCRIPTION_KEY ) . await ?
175- } ;
176- let is_initialized = test_old_schema || test_new_schema;
177- let schema = if test_old_schema {
178- SchemaVersion :: Version0
179- } else {
180- // Whether the state is initialized or not it is going
181- // to be in version 1.
182- SchemaVersion :: Version1
183- } ;
184- Ok ( ( is_initialized, schema) )
167+ async fn get_storage_state ( & self ) -> Result < Option < SchemaVersion > , ViewError > {
168+ let store = self . database . open_shared ( & [ ] ) ?;
169+ let key = bcs:: to_bytes ( & BaseKey :: NetworkDescription ) . unwrap ( ) ;
170+ if store. contains_key ( & key) . await ? {
171+ return Ok ( Some ( SchemaVersion :: Version0 ) ) ;
172+ }
173+
174+ let root_key = RootKey :: NetworkDescription . bytes ( ) ;
175+ let store = self . database . open_shared ( & root_key) ?;
176+ if store. contains_key ( NETWORK_DESCRIPTION_KEY ) . await ? {
177+ return Ok ( Some ( SchemaVersion :: Version1 ) ) ;
178+ }
179+
180+ Ok ( None )
185181 }
186182
187183 pub async fn assert_is_migrated_storage ( & self ) -> Result < ( ) , ViewError > {
188- let ( is_initialized, schema) = self . get_storage_state ( ) . await ?;
189- if is_initialized {
190- assert_eq ! ( schema, SchemaVersion :: Version1 ) ;
191- }
184+ let state = self . get_storage_state ( ) . await ?;
185+ assert ! ( matches!( state, Some ( SchemaVersion :: Version1 ) ) ) ;
192186 Ok ( ( ) )
193187 }
194188}
0 commit comments