Skip to content

Commit 4e49572

Browse files
authored
chore(cubestore): Allow to configure checksum_type/cache_capacity for RocksDB (#6335)
1 parent 73a7d9b commit 4e49572

File tree

7 files changed

+100
-19
lines changed

7 files changed

+100
-19
lines changed

rust/cubestore/Cargo.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore-sql-tests/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ tar = "0.4.38"
5353

5454
[dev-dependencies]
5555
criterion = { version = "0.4.0", features = ["async_tokio", "html_reports"] }
56-
rocksdb = { version = "0.20.1", default-features = false, features = ["bzip2"] }
56+
# Awaiting 0.20.2
57+
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "44dc84171adefbbe75a25b72c35f773a643655a0", default-features = false, features = ["bzip2"] }
5758

5859
[[bench]]
5960
name = "in_process"

rust/cubestore/cubestore/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ simple_logger = "1.7.0"
4141
async-trait = "0.1.36"
4242
actix-rt = "2.7.0"
4343
regex = "1.3.9"
44-
rocksdb = { version = "0.20.1", default-features = false, features = ["bzip2"] }
44+
# Awaiting 0.20.2
45+
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "44dc84171adefbbe75a25b72c35f773a643655a0", default-features = false, features = ["bzip2"] }
4546
uuid = { version = "0.8", features = ["serde", "v4"] }
4647
num = "0.3.0"
4748
enum_primitive = "0.1.1"

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::CubeError;
2424
use async_trait::async_trait;
2525

2626
use futures_timer::Delay;
27-
use rocksdb::{BlockBasedOptions, Options, DB};
27+
use rocksdb::{BlockBasedOptions, Cache, Options, DB};
2828

2929
use crate::cachestore::compaction::CompactionPreloadedState;
3030
use crate::cachestore::listener::RocksCacheStoreListener;
@@ -67,7 +67,8 @@ impl RocksCacheStoreDetails {
6767
}
6868

6969
impl RocksStoreDetails for RocksCacheStoreDetails {
70-
fn open_db(&self, path: &Path) -> Result<DB, CubeError> {
70+
fn open_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError> {
71+
let rocksdb_config = config.cachestore_rocksdb_config();
7172
let compaction_state = Arc::new(Mutex::new(Some(
7273
RocksCacheStoreDetails::get_compaction_state(),
7374
)));
@@ -81,9 +82,17 @@ impl RocksStoreDetails for RocksCacheStoreDetails {
8182
// Disable automatic compaction before migration, will be enabled later in after_migration
8283
opts.set_disable_auto_compactions(true);
8384

84-
let mut block_opts = BlockBasedOptions::default();
85-
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
86-
block_opts.set_format_version(5);
85+
let block_opts = {
86+
let mut block_opts = BlockBasedOptions::default();
87+
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
88+
block_opts.set_format_version(5);
89+
block_opts.set_checksum_type(rocksdb_config.checksum_type.as_rocksdb_enum());
90+
91+
let cache = Cache::new_lru_cache(rocksdb_config.cache_capacity)?;
92+
block_opts.set_block_cache(&cache);
93+
94+
block_opts
95+
};
8796

8897
opts.set_block_based_table_factory(&block_opts);
8998

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use crate::config::processing_loop::ProcessingLoop;
1414
use crate::http::HttpServer;
1515
use crate::import::limits::ConcurrencyLimits;
1616
use crate::import::{ImportService, ImportServiceImpl};
17-
use crate::metastore::{BaseRocksStoreFs, MetaStore, MetaStoreRpcClient, RocksMetaStore};
17+
use crate::metastore::{
18+
BaseRocksStoreFs, MetaStore, MetaStoreRpcClient, RocksMetaStore, RocksStoreConfig,
19+
};
1820
use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService};
1921
use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl};
2022
use crate::queryplanner::{QueryPlanner, QueryPlannerImpl};
@@ -392,8 +394,12 @@ pub trait ConfigObj: DIService {
392394

393395
fn metastore_bind_address(&self) -> &Option<String>;
394396

397+
fn metastore_rocksdb_config(&self) -> &RocksStoreConfig;
398+
395399
fn metastore_remote_address(&self) -> &Option<String>;
396400

401+
fn cachestore_rocksdb_config(&self) -> &RocksStoreConfig;
402+
397403
fn download_concurrency(&self) -> u64;
398404

399405
fn upload_concurrency(&self) -> u64;
@@ -478,6 +484,8 @@ pub struct ConfigObjImpl {
478484
pub worker_bind_address: Option<String>,
479485
pub metastore_bind_address: Option<String>,
480486
pub metastore_remote_address: Option<String>,
487+
pub metastore_rocks_store_config: RocksStoreConfig,
488+
pub cachestore_rocks_store_config: RocksStoreConfig,
481489
pub upload_concurrency: u64,
482490
pub download_concurrency: u64,
483491
pub connection_timeout: u64,
@@ -626,6 +634,14 @@ impl ConfigObj for ConfigObjImpl {
626634
&self.metastore_remote_address
627635
}
628636

637+
fn metastore_rocksdb_config(&self) -> &RocksStoreConfig {
638+
&self.metastore_rocks_store_config
639+
}
640+
641+
fn cachestore_rocksdb_config(&self) -> &RocksStoreConfig {
642+
&self.cachestore_rocks_store_config
643+
}
644+
629645
fn download_concurrency(&self) -> u64 {
630646
self.download_concurrency
631647
}
@@ -881,6 +897,8 @@ impl Config {
881897
env_optparse::<u16>("CUBESTORE_META_PORT").map(|v| format!("0.0.0.0:{}", v))
882898
}),
883899
metastore_remote_address: env::var("CUBESTORE_META_ADDR").ok(),
900+
metastore_rocks_store_config: RocksStoreConfig::metastore_default(),
901+
cachestore_rocks_store_config: RocksStoreConfig::cachestore_default(),
884902
upload_concurrency: env_parse("CUBESTORE_MAX_ACTIVE_UPLOADS", 4),
885903
download_concurrency: env_parse("CUBESTORE_MAX_ACTIVE_DOWNLOADS", 8),
886904
max_ingestion_data_frames: env_parse("CUBESTORE_MAX_DATA_FRAMES", 4),
@@ -981,6 +999,8 @@ impl Config {
981999
worker_bind_address: None,
9821000
metastore_bind_address: None,
9831001
metastore_remote_address: None,
1002+
metastore_rocks_store_config: RocksStoreConfig::metastore_default(),
1003+
cachestore_rocks_store_config: RocksStoreConfig::cachestore_default(),
9841004
upload_concurrency: 4,
9851005
download_concurrency: 8,
9861006
max_ingestion_data_frames: 4,

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub use rocks_table::*;
2121
use crate::cluster::node_name_by_partition;
2222
use async_trait::async_trait;
2323
use log::info;
24-
use rocksdb::{BlockBasedOptions, Env, MergeOperands, Options, DB};
24+
use rocksdb::{BlockBasedOptions, Cache, Env, MergeOperands, Options, DB};
2525
use serde::{Deserialize, Serialize};
2626
use std::hash::Hash;
2727
use std::{env, io::Cursor, sync::Arc};
@@ -1146,15 +1146,24 @@ fn meta_store_merge(
11461146
struct RocksMetaStoreDetails {}
11471147

11481148
impl RocksStoreDetails for RocksMetaStoreDetails {
1149-
fn open_db(&self, path: &Path) -> Result<DB, CubeError> {
1149+
fn open_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError> {
1150+
let rocksdb_config = config.metastore_rocksdb_config();
11501151
let mut opts = Options::default();
11511152
opts.create_if_missing(true);
11521153
opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13));
11531154
opts.set_merge_operator_associative("meta_store merge", meta_store_merge);
11541155

1155-
let mut block_opts = BlockBasedOptions::default();
1156-
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
1157-
block_opts.set_format_version(5);
1156+
let block_opts = {
1157+
let mut block_opts = BlockBasedOptions::default();
1158+
// https://github.com/facebook/rocksdb/blob/v7.9.2/include/rocksdb/table.h#L524
1159+
block_opts.set_format_version(5);
1160+
block_opts.set_checksum_type(rocksdb_config.checksum_type.as_rocksdb_enum());
1161+
1162+
let cache = Cache::new_lru_cache(rocksdb_config.cache_capacity)?;
1163+
block_opts.set_block_cache(&cache);
1164+
1165+
block_opts
1166+
};
11581167

11591168
opts.set_block_based_table_factory(&block_opts);
11601169

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,51 @@ macro_rules! meta_store_table_impl {
491491
};
492492
}
493493

494+
#[derive(Debug, Clone)]
495+
pub enum RocksStoreChecksumType {
496+
NoChecksum = 0,
497+
CRC32c = 1,
498+
XXHash = 2,
499+
XXHash64 = 3,
500+
XXH3 = 4, // Supported since RocksDB 6.27
501+
}
502+
503+
impl RocksStoreChecksumType {
504+
pub fn as_rocksdb_enum(&self) -> rocksdb::ChecksumType {
505+
match &self {
506+
RocksStoreChecksumType::NoChecksum => rocksdb::ChecksumType::NoChecksum,
507+
RocksStoreChecksumType::CRC32c => rocksdb::ChecksumType::CRC32c,
508+
RocksStoreChecksumType::XXHash => rocksdb::ChecksumType::XXHash,
509+
RocksStoreChecksumType::XXHash64 => rocksdb::ChecksumType::XXHash64,
510+
RocksStoreChecksumType::XXH3 => rocksdb::ChecksumType::XXH3,
511+
}
512+
}
513+
}
514+
515+
#[derive(Debug, Clone)]
516+
pub struct RocksStoreConfig {
517+
pub checksum_type: RocksStoreChecksumType,
518+
pub cache_capacity: usize,
519+
}
520+
521+
impl RocksStoreConfig {
522+
pub fn metastore_default() -> Self {
523+
Self {
524+
checksum_type: RocksStoreChecksumType::XXHash,
525+
cache_capacity: 1024 * 8,
526+
}
527+
}
528+
529+
pub fn cachestore_default() -> Self {
530+
Self {
531+
checksum_type: RocksStoreChecksumType::XXHash,
532+
cache_capacity: 1024 * 8,
533+
}
534+
}
535+
}
536+
494537
pub trait RocksStoreDetails: Send + Sync {
495-
fn open_db(&self, path: &Path) -> Result<DB, CubeError>;
538+
fn open_db(&self, path: &Path, config: &Arc<dyn ConfigObj>) -> Result<DB, CubeError>;
496539

497540
fn migrate(&self, table_ref: DbTableRef) -> Result<(), CubeError>;
498541

@@ -554,7 +597,7 @@ impl RocksStore {
554597
config: Arc<dyn ConfigObj>,
555598
details: Arc<dyn RocksStoreDetails>,
556599
) -> Result<Self, CubeError> {
557-
let db = details.open_db(path)?;
600+
let db = details.open_db(path, &config)?;
558601
let db_arc = Arc::new(db);
559602

560603
let (rw_loop_tx, mut rw_loop_rx) = tokio::sync::mpsc::channel::<

0 commit comments

Comments
 (0)