@@ -14,10 +14,13 @@ use std::{
1414 collections:: HashMap ,
1515 error, io,
1616 path:: { Path , PathBuf } ,
17+ time:: { Duration , Instant } ,
1718} ;
1819
20+ use parking_lot:: Mutex ;
1921use rocksdb:: {
20- BlockBasedOptions , ColumnFamily , ColumnFamilyDescriptor , Options , ReadOptions , WriteBatch , WriteOptions , DB ,
22+ BlockBasedOptions , ColumnFamily , ColumnFamilyDescriptor , CompactOptions , Options , ReadOptions , WriteBatch ,
23+ WriteOptions , DB ,
2124} ;
2225
2326use kvdb:: { DBKeyValue , DBOp , DBTransaction , DBValue , KeyValueDB } ;
@@ -268,6 +271,7 @@ pub struct Database {
268271 read_opts : ReadOptions ,
269272 block_opts : BlockBasedOptions ,
270273 stats : stats:: RunningDbStats ,
274+ last_compaction : Mutex < Instant > ,
271275}
272276
273277/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
@@ -350,15 +354,23 @@ impl Database {
350354 Self :: open_primary ( & opts, path. as_ref ( ) , config, column_names. as_slice ( ) , & block_opts) ?
351355 } ;
352356
353- Ok ( Database {
357+ let db = Database {
354358 inner : DBAndColumns { db, column_names } ,
355359 config : config. clone ( ) ,
356360 opts,
357361 read_opts,
358362 write_opts,
359363 block_opts,
360364 stats : stats:: RunningDbStats :: new ( ) ,
361- } )
365+ last_compaction : Mutex :: new ( Instant :: now ( ) ) ,
366+ } ;
367+
368+ // After opening the DB, we want to compact it.
369+ //
370+ // This just in case the node crashed before to ensure the db stays fast.
371+ db. force_compaction ( ) ?;
372+
373+ Ok ( db)
362374 }
363375
364376 /// Internal api to open a database in primary mode.
@@ -460,7 +472,21 @@ impl Database {
460472 }
461473 self . stats . tally_bytes_written ( stats_total_bytes as u64 ) ;
462474
463- cfs. db . write_opt ( batch, & self . write_opts ) . map_err ( other_io_err)
475+ let res = cfs. db . write_opt ( batch, & self . write_opts ) . map_err ( other_io_err) ?;
476+
477+ // If we have written more data than what we want to have stored in a `sst` file, we force compaction.
478+ // We also ensure that we only compact once per minute.
479+ //
480+ // Otherwise, rocksdb read performance is going down, after e.g. a warp sync.
481+ if stats_total_bytes > self . config . compaction . initial_file_size as usize &&
482+ self . last_compaction . lock ( ) . elapsed ( ) > Duration :: from_secs ( 60 )
483+ {
484+ self . force_compaction ( ) ?;
485+
486+ * self . last_compaction . lock ( ) = Instant :: now ( ) ;
487+ }
488+
489+ Ok ( res)
464490 }
465491
466492 /// Get value by key.
@@ -579,6 +605,23 @@ impl Database {
579605 pub fn try_catch_up_with_primary ( & self ) -> io:: Result < ( ) > {
580606 self . inner . db . try_catch_up_with_primary ( ) . map_err ( other_io_err)
581607 }
608+
609+ /// Force compacting the entire db.
610+ fn force_compaction ( & self ) -> io:: Result < ( ) > {
611+ let mut compact_options = CompactOptions :: default ( ) ;
612+ compact_options. set_bottommost_level_compaction ( rocksdb:: BottommostLevelCompaction :: Force ) ;
613+
614+ // Don't ask me why we can not just use `compact_range_opt`...
615+ // But we are forced to trigger compaction on every column. Actually we only need this for the `STATE` column,
616+ // but we don't know which one this is here. So, we just iterate all of them.
617+ for col in 0 ..self . inner . column_names . len ( ) {
618+ self . inner
619+ . db
620+ . compact_range_cf_opt ( self . inner . cf ( col) ?, None :: < Vec < u8 > > , None :: < Vec < u8 > > , & compact_options) ;
621+ }
622+
623+ Ok ( ( ) )
624+ }
582625}
583626
584627// duplicate declaration of methods here to avoid trait import in certain existing cases
0 commit comments