Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 73 additions & 98 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use ethrex_common::{
};
use ethrex_trie::{Nibbles, Node, Trie};
use rocksdb::{
BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded,
Options, WriteBatch, checkpoint::Checkpoint,
BlockBasedIndexType, BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor,
DBCompactionStyle, DBWithThreadMode, DataBlockIndexType, LruCacheOptions, MultiThreaded,
Options, SliceTransform, WriteBatch, checkpoint::Checkpoint,
};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -148,41 +149,79 @@ pub struct Store {
impl Store {
pub fn new(path: &Path) -> Result<Self, StoreError> {
let mut db_options = Options::default();
let mut block_opts = BlockBasedOptions::default();

db_options.create_if_missing(true);
db_options.create_missing_column_families(true);

db_options.set_max_open_files(-1);
db_options.set_max_file_opening_threads(16);

db_options.set_max_background_jobs(8);

db_options.set_level_zero_file_num_compaction_trigger(2);
db_options.set_level_zero_slowdown_writes_trigger(10);
db_options.set_level_zero_stop_writes_trigger(16);
db_options.set_target_file_size_base(512 * 1024 * 1024); // 512MB
db_options.set_max_bytes_for_level_base(2 * 1024 * 1024 * 1024); // 2GB L1
db_options.set_max_bytes_for_level_multiplier(10.0);
db_options.set_level_compaction_dynamic_level_bytes(true);

db_options.set_db_write_buffer_size(1024 * 1024 * 1024); // 1GB
db_options.set_write_buffer_size(128 * 1024 * 1024); // 128MB
db_options.set_max_write_buffer_number(4);
db_options.set_min_write_buffer_number_to_merge(2);

db_options.set_wal_recovery_mode(rocksdb::DBRecoveryMode::PointInTime);
db_options.set_max_total_wal_size(2 * 1024 * 1024 * 1024); // 2GB
db_options.set_wal_bytes_per_sync(32 * 1024 * 1024); // 32MB
db_options.set_bytes_per_sync(32 * 1024 * 1024); // 32MB
db_options.set_use_fsync(false); // fdatasync

db_options.set_enable_pipelined_write(true);
db_options.set_allow_concurrent_memtable_write(true);
db_options.set_enable_write_thread_adaptive_yield(true);
db_options.set_compaction_readahead_size(4 * 1024 * 1024); // 4MB
db_options.set_advise_random_on_open(false);
#[expect(deprecated)]
{
db_options.set_max_background_compactions(4);
db_options.set_max_background_flushes(2);
}

// db_options.enable_statistics();
// db_options.set_stats_dump_period_sec(600);
// The following can only be done in 64-bits arches, for now let's force it
const { assert!(std::mem::size_of::<u64>() == std::mem::size_of::<usize>()) };
let cache = Cache::new_lru_cache(8 << 30);

const CFG: &str = env!("ROCKSDB_CONFIG_VERSION");
match CFG {
"1" => {
db_options.set_max_open_files(-1);
db_options.set_compaction_style(DBCompactionStyle::Level);
db_options.set_level_compaction_dynamic_level_bytes(true);
db_options.set_use_direct_reads(true);
db_options.set_use_direct_io_for_flush_and_compaction(true);
db_options.set_disable_auto_compactions(false);
db_options.optimize_for_point_lookup(256 << 20);

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(8 * 1024);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_opts.set_partition_filters(true);
block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
block_opts.set_metadata_block_size(4096);
block_opts.set_pin_top_level_index_and_filter(true);
block_opts.set_block_cache(&cache);

db_options.set_block_based_table_factory(&block_opts);
}
"2" => {
db_options.set_max_open_files(-1);
db_options.set_compaction_style(DBCompactionStyle::Level);
db_options.set_level_compaction_dynamic_level_bytes(true);
db_options.set_use_direct_reads(true);
db_options.set_use_direct_io_for_flush_and_compaction(true);
db_options.set_disable_auto_compactions(false);
db_options.optimize_for_point_lookup(256 << 20);

block_opts.set_block_size(8 * 1024);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_opts.set_partition_filters(true);
block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
block_opts.set_metadata_block_size(4096);
block_opts.set_pin_top_level_index_and_filter(true);
block_opts.set_block_cache(&cache);

db_options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
db_options.set_memtable_whole_key_filtering(true);

db_options.set_block_based_table_factory(&block_opts);
}
"3" => {
db_options.optimize_for_point_lookup(256 << 20);
db_options.set_use_direct_reads(true);
db_options.set_use_direct_io_for_flush_and_compaction(true);

block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_opts.set_partition_filters(true);
block_opts.set_block_size(4096); // Or 8192
}
_ => {}
}

// Current column families that the code expects
let expected_column_families = vec![
Expand Down Expand Up @@ -234,71 +273,7 @@ impl Store {

let mut cf_descriptors = Vec::new();
for cf_name in &all_cfs_to_open {
let mut cf_opts = Options::default();

cf_opts.set_level_zero_file_num_compaction_trigger(4);
cf_opts.set_level_zero_slowdown_writes_trigger(20);
cf_opts.set_level_zero_stop_writes_trigger(36);

match cf_name.as_str() {
CF_HEADERS | CF_BODIES => {
cf_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
cf_opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB
cf_opts.set_max_write_buffer_number(4);
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(32 * 1024); // 32KB blocks
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_CANONICAL_BLOCK_HASHES | CF_BLOCK_NUMBERS => {
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
cf_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_bloom_filter(10.0, false);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_TRIE_NODES => {
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB
cf_opts.set_max_write_buffer_number(6);
cf_opts.set_min_write_buffer_number_to_merge(2);
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB
cf_opts.set_memtable_prefix_bloom_ratio(0.2); // Bloom filter

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_bloom_filter(10.0, false); // 10 bits per key
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_RECEIPTS | CF_ACCOUNT_CODES => {
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
cf_opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB
cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(32 * 1024); // 32KB
cf_opts.set_block_based_table_factory(&block_opts);
}
_ => {
// Default for other CFs
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
cf_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB
cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
cf_opts.set_block_based_table_factory(&block_opts);
}
}

cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts));
cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, db_options.clone()));
}

// Note: we are not using transactions on our Rocksdb instance.
Expand Down
Loading