@@ -41,7 +41,8 @@ pub struct PartitionDb {
4141 archived_lsn : watch:: Sender < Option < Lsn > > ,
4242 // Note: Rust will drop the fields in the order they are declared in the struct.
4343 // It's crucial to keep the column family and the database in this exact order.
44- cf : PartitionBoundCfHandle ,
44+ data_cf : PartitionBoundCfHandle ,
45+ idx_cf : Option < PartitionBoundCfHandle > ,
4546 rocksdb : Arc < RocksDb > ,
4647}
4748
@@ -50,15 +51,19 @@ impl PartitionDb {
5051 meta : Arc < Partition > ,
5152 archived_lsn : watch:: Sender < Option < Lsn > > ,
5253 rocksdb : Arc < RocksDb > ,
53- cf : Arc < BoundColumnFamily < ' _ > > ,
54+ data_cf : Arc < BoundColumnFamily < ' _ > > ,
55+ idx_cf : Option < Arc < BoundColumnFamily < ' _ > > > ,
5456 ) -> Self {
5557 Self {
5658 meta,
5759 durable_lsn : watch:: Sender :: new ( None ) ,
5860 archived_lsn,
5961 // SAFETY: the new BoundColumnFamily here just expanding lifetime to static,
6062 // it's safe to use here as long as rocksdb is dropped last.
61- cf : unsafe { PartitionBoundCfHandle :: new ( cf) } ,
63+ data_cf : unsafe { PartitionBoundCfHandle :: new ( data_cf) } ,
64+ // SAFETY: the new BoundColumnFamily here just expanding lifetime to static,
65+ // it's safe to use here as long as rocksdb is dropped last.
66+ idx_cf : idx_cf. map ( |cf| unsafe { PartitionBoundCfHandle :: new ( cf) } ) ,
6267 rocksdb,
6368 }
6469 }
@@ -76,22 +81,48 @@ impl PartitionDb {
7681 self . rocksdb
7782 }
7883
79- pub fn cf_handle ( & self ) -> & Arc < BoundColumnFamily < ' _ > > {
80- & self . cf . 0
84+ pub fn data_cf_handle ( & self ) -> & Arc < BoundColumnFamily < ' _ > > {
85+ & self . data_cf . 0
86+ }
87+
88+ pub fn idx_cf_handle ( & self ) -> Option < & Arc < BoundColumnFamily < ' _ > > > {
89+ self . idx_cf . as_ref ( ) . map ( |cf| & cf. 0 )
8190 }
8291
8392 pub fn cf_names ( & self ) -> Vec < SmartString > {
84- vec ! [ self . meta. cf_name( ) . into_inner( ) ]
93+ if Configuration :: pinned ( ) . worker . storage . enable_index_cf {
94+ vec ! [
95+ self . meta. data_cf_name( ) . into_inner( ) ,
96+ CfName :: new_idx( self . meta. partition_id) . into( ) ,
97+ ]
98+ } else {
99+ vec ! [ self . meta. data_cf_name( ) . into_inner( ) ]
100+ }
85101 }
86102
87103 pub async fn flush_memtables ( & self , wait : bool ) -> Result < ( ) , RocksError > {
88- self . rocksdb
89- . clone ( )
90- . flush_memtables (
91- std:: slice:: from_ref ( & restate_rocksdb:: CfName :: from ( self . partition ( ) . cf_name ( ) ) ) ,
92- wait,
93- )
94- . await
104+ if Configuration :: pinned ( ) . worker . storage . enable_index_cf {
105+ self . rocksdb
106+ . clone ( )
107+ . flush_memtables (
108+ & [
109+ restate_rocksdb:: CfName :: from ( self . partition ( ) . data_cf_name ( ) ) ,
110+ restate_rocksdb:: CfName :: from ( self . partition ( ) . idx_cf_name ( ) ) ,
111+ ] ,
112+ wait,
113+ )
114+ . await
115+ } else {
116+ self . rocksdb
117+ . clone ( )
118+ . flush_memtables (
119+ & [ restate_rocksdb:: CfName :: from (
120+ self . partition ( ) . data_cf_name ( ) ,
121+ ) ] ,
122+ wait,
123+ )
124+ . await
125+ }
95126 }
96127
97128 pub ( crate ) fn note_archived_lsn ( & self , lsn : Lsn ) -> bool {
@@ -119,16 +150,38 @@ impl PartitionDb {
119150 }
120151
121152 pub ( crate ) fn update_memory_budget ( & self , memory_budget : usize ) {
122- let max_bytes_for_level_base = memory_budget;
123- let single_memtable_budget = memory_budget / 4 ;
124- let target_file_size_base = memory_budget / 8 ;
125-
126- let max_bytes_for_level_base_str = max_bytes_for_level_base. to_string ( ) ;
127- let single_memtable_budget_str = single_memtable_budget. to_string ( ) ;
128- let target_file_size_base_str = target_file_size_base. to_string ( ) ;
153+ // This is a very rudimentary approach to balancing the memory budget between the data-cf
154+ // and the index-cf. In the future, we might want to revisit this and come up with a more
155+ // sophisticated approach.
156+ //
157+ // We give 75% of the memory budget to the data-cf but ensure a reasonable minimum
158+ // is still applied to the index-cf.
159+ //
160+ // NOTE: Changes to this code should also be reflected in CfConfigurator::get_cf_options() method.
161+ let ( data_memory_budget, index_memory_budget) =
162+ if Configuration :: pinned ( ) . worker . storage . enable_index_cf {
163+ let idx = memory_budget. div_ceil ( 4 ) ;
164+ let data = memory_budget - idx;
165+ ( data, idx)
166+ } else {
167+ ( memory_budget, 0 )
168+ } ;
129169
130170 // impacts only this partition's column-families
131171 for cf in self . cf_names ( ) {
172+ let memory_budget = if cf. starts_with ( crate :: IDX_CF_PREFIX ) {
173+ index_memory_budget
174+ } else {
175+ data_memory_budget
176+ } ;
177+
178+ let max_bytes_for_level_base = memory_budget;
179+ let single_memtable_budget = memory_budget / 4 ;
180+ let target_file_size_base = memory_budget / 8 ;
181+
182+ let max_bytes_for_level_base_str = max_bytes_for_level_base. to_string ( ) ;
183+ let single_memtable_budget_str = single_memtable_budget. to_string ( ) ;
184+ let target_file_size_base_str = target_file_size_base. to_string ( ) ;
132185 debug ! (
133186 "Updating memory budget for {}/{} to {}" ,
134187 self . rocksdb. name( ) ,
@@ -187,7 +240,7 @@ impl PartitionCell {
187240 }
188241
189242 pub fn cf_name ( & self ) -> CfName {
190- self . meta . cf_name ( )
243+ self . meta . data_cf_name ( )
191244 }
192245
193246 fn open_local_cf ( & self , guard : & mut tokio:: sync:: RwLockWriteGuard < ' _ , State > , db : PartitionDb ) {
@@ -204,49 +257,55 @@ impl PartitionCell {
204257 }
205258 }
206259
207- pub fn open_cf (
260+ pub async fn open_cf (
208261 & self ,
209262 guard : & mut tokio:: sync:: RwLockWriteGuard < ' _ , State > ,
210263 rocksdb : & Arc < RocksDb > ,
211- ) {
264+ ) -> Result < ( ) , RocksError > {
212265 let cf_name = self . cf_name ( ) ;
213266 match rocksdb. inner ( ) . cf_handle ( cf_name. as_ref ( ) ) {
214- Some ( handle) => {
267+ Some ( data_handle) => {
268+ let idx_handle = open_or_create_index_cf ( rocksdb, & self . meta ) . await ?;
215269 let db = PartitionDb :: new (
216270 self . meta . clone ( ) ,
217271 self . archived_lsn . clone ( ) ,
218272 rocksdb. clone ( ) ,
219- handle,
273+ data_handle,
274+ idx_handle,
220275 ) ;
221276 self . open_local_cf ( guard, db) ;
222277 }
223278 None => {
224279 self . set_cf_missing ( guard) ;
225280 }
226281 }
282+
283+ Ok ( ( ) )
227284 }
228285
229286 // low-level opening of a column famili(es) for the partition.
230287 //
231288 // Note: This doesn't check whether the column family exists or not
232- #[ instrument( level = "error" , skip_all, fields( partition_id = %self . meta. partition_id, cf_name = %self . meta. cf_name ( ) ) ) ]
289+ #[ instrument( level = "error" , skip_all, fields( partition_id = %self . meta. partition_id, cf_name = %self . meta. data_cf_name ( ) ) ) ]
233290 pub async fn provision (
234291 & self ,
235292 guard : & mut tokio:: sync:: RwLockWriteGuard < ' _ , State > ,
236293 rocksdb : Arc < RocksDb > ,
237294 ) -> Result < PartitionDb , RocksError > {
238- let cf_name = self . meta . cf_name ( ) ;
239- debug ! ( "Creating new column family {}" , cf_name) ;
240- rocksdb. clone ( ) . open_cf ( self . meta . cf_name ( ) . into ( ) ) . await ?;
241- let handle = rocksdb
242- . inner ( )
243- . cf_handle ( cf_name. as_ref ( ) )
244- . expect ( "cf must be open" ) ;
295+ let data_cf_name = self . meta . data_cf_name ( ) ;
296+ debug ! ( "Creating new column family {}" , data_cf_name) ;
297+ rocksdb
298+ . clone ( )
299+ . open_cf ( self . meta . data_cf_name ( ) . into ( ) )
300+ . await ?;
301+ let data_handle = rocksdb. inner ( ) . cf_handle ( data_cf_name. as_ref ( ) ) . unwrap ( ) ;
302+ let idx_handle = maybe_open_index_cf ( & rocksdb, & self . meta ) . await ?;
245303 let db = PartitionDb :: new (
246304 self . meta . clone ( ) ,
247305 self . archived_lsn . clone ( ) ,
248306 rocksdb. clone ( ) ,
249- handle,
307+ data_handle,
308+ idx_handle,
250309 ) ;
251310 self . open_local_cf ( guard, db. clone ( ) ) ;
252311 Ok ( db)
@@ -255,7 +314,7 @@ impl PartitionCell {
255314 // low-level importing a column family from a locally downloaded a snapshot
256315 //
257316 // Note: This doesn't check whether the column family exists or not
258- #[ instrument( level = "error" , skip_all, fields( partition_id = %self . meta. partition_id, cf_name = %self . meta. cf_name ( ) , path = %snapshot. base_dir. display( ) ) ) ]
317+ #[ instrument( level = "error" , skip_all, fields( partition_id = %self . meta. partition_id, cf_name = %self . meta. data_cf_name ( ) , path = %snapshot. base_dir. display( ) ) ) ]
259318 pub async fn import_cf (
260319 & self ,
261320 guard : & mut tokio:: sync:: RwLockWriteGuard < ' _ , State > ,
@@ -286,7 +345,7 @@ impl PartitionCell {
286345
287346 rocksdb
288347 . clone ( )
289- . import_cf ( self . meta . cf_name ( ) . into ( ) , import_metadata)
348+ . import_cf ( self . meta . data_cf_name ( ) . into ( ) , import_metadata)
290349 . await ?;
291350
292351 if let Err ( err) = tokio:: fs:: remove_dir_all ( & snapshot. base_dir ) . await {
@@ -298,14 +357,16 @@ impl PartitionCell {
298357 ) ;
299358 } ;
300359
360+ let idx_handle = maybe_open_index_cf ( & rocksdb, & self . meta ) . await ?;
301361 let db = PartitionDb :: new (
302362 self . meta . clone ( ) ,
303363 self . archived_lsn . clone ( ) ,
304364 rocksdb. clone ( ) ,
305365 rocksdb
306366 . inner ( )
307- . cf_handle ( self . meta . cf_name ( ) . as_ref ( ) )
308- . expect ( "cf must exist after import" ) ,
367+ . cf_handle ( self . meta . data_cf_name ( ) . as_ref ( ) )
368+ . unwrap ( ) ,
369+ idx_handle,
309370 ) ;
310371
311372 self . open_local_cf ( guard, db. clone ( ) ) ;
@@ -324,15 +385,18 @@ impl PartitionCell {
324385 State :: CfMissing => { /* nothing to do.*/ }
325386 State :: Open { db } | State :: Closed { db, .. } => {
326387 let db = Arc :: clone ( & db. rocksdb ) ;
327- let cf_name = self . meta . cf_name ( ) . clone ( ) ;
388+ let data_cf_name = self . meta . data_cf_name ( ) . clone ( ) ;
389+ let idx_cf_name = self . meta . idx_cf_name ( ) . clone ( ) ;
328390
329391 // if dropping failed. We leave the column family closed marked as "unknown"
330392 tokio:: task:: spawn_blocking ( move || {
331- db. inner ( ) . as_raw_db ( ) . drop_cf ( cf_name. as_ref ( ) )
393+ // ignore the case where the index cf doesn't exist.
394+ let _ = db. inner ( ) . as_raw_db ( ) . drop_cf ( idx_cf_name. as_ref ( ) ) ;
395+ db. inner ( ) . as_raw_db ( ) . drop_cf ( data_cf_name. as_ref ( ) )
332396 } )
333397 . await
334398 . map_err ( |_| RocksError :: Shutdown ( ShutdownError ) ) ??;
335- debug ! ( "Column family {} dropped" , self . meta. cf_name ( ) ) ;
399+ debug ! ( "Column family {} dropped" , self . meta. data_cf_name ( ) ) ;
336400 }
337401 }
338402 self . set_cf_missing ( guard) ;
@@ -486,10 +550,14 @@ impl DbConfigurator for RocksConfigurator<AllDataCf> {
486550 write_buffer_manager,
487551 ) ;
488552
489- self . apply_db_opts_from_config (
490- & mut db_options,
491- & Configuration :: pinned ( ) . worker . storage . rocksdb ,
492- ) ;
553+ let storage_config = & Configuration :: pinned ( ) . worker . storage ;
554+ self . apply_db_opts_from_config ( & mut db_options, & storage_config. rocksdb ) ;
555+
556+ if storage_config. enable_index_cf {
557+ // Atomic flush is required to ensure that the index cf is in-sync with
558+ // the data cf.
559+ db_options. set_atomic_flush ( true ) ;
560+ }
493561
494562 let event_listener = DurableLsnEventListener :: new ( & self . shared_state ) ;
495563 db_options. add_event_listener ( event_listener) ;
@@ -541,7 +609,24 @@ impl CfConfigurator for RocksConfigurator<AllDataCf> {
541609 cf_options. add_table_properties_collector_factory ( AppliedLsnCollectorFactory ) ;
542610
543611 // -- Initial Memory Configuration --
612+ //
544613 let memtables_budget = self . memory_budget . current_per_partition_budget ( ) ;
614+ // NOTE: Changes to this code should also be reflected in update_memory_budget() method
615+ let ( data_memory_budget, index_memory_budget) =
616+ if Configuration :: pinned ( ) . worker . storage . enable_index_cf {
617+ let idx = memtables_budget. div_ceil ( 4 ) ;
618+ let data = memtables_budget - idx;
619+ ( data, idx)
620+ } else {
621+ ( memtables_budget, 0 )
622+ } ;
623+
624+ let memtables_budget = if cf_name. starts_with ( crate :: IDX_CF_PREFIX ) {
625+ index_memory_budget
626+ } else {
627+ data_memory_budget
628+ } ;
629+
545630 tracing:: debug!(
546631 "Configured {db_name}/{cf_name} with memtable budget={}" ,
547632 ByteCount :: from( memtables_budget)
@@ -564,3 +649,53 @@ impl CfConfigurator for RocksConfigurator<AllDataCf> {
564649 cf_options
565650 }
566651}
652+
653+ async fn maybe_open_index_cf < ' a > (
654+ rocksdb : & ' a Arc < RocksDb > ,
655+ meta : & Arc < Partition > ,
656+ ) -> Result < Option < Arc < BoundColumnFamily < ' a > > > , RocksError > {
657+ if Configuration :: pinned ( ) . worker . storage . enable_index_cf {
658+ let idx_cf_name = meta. idx_cf_name ( ) ;
659+ debug ! ( "Creating idx column family {}" , idx_cf_name) ;
660+ rocksdb. clone ( ) . open_cf ( idx_cf_name. clone ( ) . into ( ) ) . await ?;
661+ Ok ( Some (
662+ rocksdb. inner ( ) . cf_handle ( idx_cf_name. as_ref ( ) ) . unwrap ( ) ,
663+ ) )
664+ } else {
665+ Ok ( None )
666+ }
667+ }
668+
669+ async fn open_or_create_index_cf < ' a > (
670+ rocksdb : & ' a Arc < RocksDb > ,
671+ meta : & Arc < Partition > ,
672+ ) -> Result < Option < Arc < BoundColumnFamily < ' a > > > , RocksError > {
673+ let idx_cf_name = meta. idx_cf_name ( ) ;
674+ if Configuration :: pinned ( ) . worker . storage . enable_index_cf {
675+ if let Some ( handle) = rocksdb. inner ( ) . cf_handle ( idx_cf_name. as_ref ( ) ) {
676+ Ok ( Some ( handle) )
677+ } else {
678+ debug ! ( "Creating idx column family {}" , idx_cf_name) ;
679+ rocksdb. clone ( ) . open_cf ( idx_cf_name. clone ( ) . into ( ) ) . await ?;
680+ Ok ( Some (
681+ rocksdb. inner ( ) . cf_handle ( idx_cf_name. as_ref ( ) ) . unwrap ( ) ,
682+ ) )
683+ }
684+ } else {
685+ // If the feature is disabled and the column family exists, we need to drop it.
686+ // this is to ensure that the column family's contents are always consistent
687+ // with the state of the partition. Therefore, the presence of the column family
688+ // would be sufficient to indicate that the index can be used.
689+ if rocksdb. inner ( ) . cf_handle ( idx_cf_name. as_ref ( ) ) . is_some ( ) {
690+ let cf = idx_cf_name. clone ( ) ;
691+ let rocksdb = rocksdb. clone ( ) ;
692+ // if dropping failed. We leave the column family closed marked as "unknown"
693+ tokio:: task:: spawn_blocking ( move || rocksdb. inner ( ) . as_raw_db ( ) . drop_cf ( cf. as_ref ( ) ) )
694+ . await
695+ . map_err ( |_| RocksError :: Shutdown ( ShutdownError ) ) ??;
696+ info ! ( "Column family {} dropped" , idx_cf_name) ;
697+ }
698+
699+ Ok ( None )
700+ }
701+ }
0 commit comments