Skip to content

Commit df73851

Browse files
authored
feat(cubestore): Separate configuration for count of metastore and cachestore snapshots (#6388)
1 parent e9a4458 commit df73851

File tree

8 files changed

+70
-32
lines changed

8 files changed

+70
-32
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl RocksCacheStore {
230230
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
231231
let store = RocksStore::new(
232232
store_path.clone().join(details.get_name()).as_path(),
233-
BaseRocksStoreFs::new(remote_fs.clone(), "cachestore", config.config_obj()),
233+
BaseRocksStoreFs::new_for_cachestore(remote_fs.clone(), config.config_obj()),
234234
config.config_obj(),
235235
details,
236236
)

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,10 @@ pub trait ConfigObj: DIService {
442442

443443
fn metastore_snapshots_lifetime(&self) -> u64;
444444

445+
fn minimum_cachestore_snapshots_count(&self) -> u64;
446+
447+
fn cachestore_snapshots_lifetime(&self) -> u64;
448+
445449
fn max_disk_space(&self) -> u64;
446450
fn max_disk_space_per_worker(&self) -> u64;
447451

@@ -508,6 +512,8 @@ pub struct ConfigObjImpl {
508512
pub skip_kafka_parsing_errors: bool,
509513
pub minimum_metastore_snapshots_count: u64,
510514
pub metastore_snapshots_lifetime: u64,
515+
pub minimum_cachestore_snapshots_count: u64,
516+
pub cachestore_snapshots_lifetime: u64,
511517
pub max_disk_space: u64,
512518
pub max_disk_space_per_worker: u64,
513519
pub disk_space_cache_duration_secs: u64,
@@ -725,6 +731,14 @@ impl ConfigObj for ConfigObjImpl {
725731
self.metastore_snapshots_lifetime
726732
}
727733

734+
fn minimum_cachestore_snapshots_count(&self) -> u64 {
735+
self.minimum_cachestore_snapshots_count
736+
}
737+
738+
fn cachestore_snapshots_lifetime(&self) -> u64 {
739+
self.cachestore_snapshots_lifetime
740+
}
741+
728742
fn max_disk_space(&self) -> u64 {
729743
self.max_disk_space
730744
}
@@ -999,6 +1013,14 @@ impl Config {
9991013
"CUBESTORE_METASTORE_SNAPSHOTS_LIFETIME",
10001014
24 * 60 * 60,
10011015
),
1016+
minimum_cachestore_snapshots_count: env_parse(
1017+
"CUBESTORE_MINIMUM_CACHESTORE_SNAPSHOTS_COUNT",
1018+
5,
1019+
),
1020+
cachestore_snapshots_lifetime: env_parse(
1021+
"CUBESTORE_CACHESTORE_SNAPSHOTS_LIFETIME",
1022+
60 * 60,
1023+
),
10021024
max_disk_space: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0) * 1024 * 1024 * 1024,
10031025
max_disk_space_per_worker: env_parse("CUBESTORE_MAX_DISK_SPACE_PER_WORKER_GB", 0)
10041026
* 1024
@@ -1088,6 +1110,8 @@ impl Config {
10881110
skip_kafka_parsing_errors: false,
10891111
minimum_metastore_snapshots_count: 3,
10901112
metastore_snapshots_lifetime: 24 * 3600,
1113+
minimum_cachestore_snapshots_count: 3,
1114+
cachestore_snapshots_lifetime: 3600,
10911115
max_disk_space: 0,
10921116
max_disk_space_per_worker: 0,
10931117
disk_space_cache_duration_secs: 0,
@@ -1320,9 +1344,8 @@ impl Config {
13201344
.register("cachestore_fs", async move |i| {
13211345
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
13221346
let original_remote_fs = i.get_service("original_remote_fs").await;
1323-
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new(
1347+
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new_for_cachestore(
13241348
original_remote_fs,
1325-
"cachestore",
13261349
i.get_service_typed().await,
13271350
);
13281351

@@ -1394,9 +1417,8 @@ impl Config {
13941417
.register("metastore_fs", async move |i| {
13951418
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
13961419
let original_remote_fs = i.get_service("original_remote_fs").await;
1397-
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new(
1420+
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new_for_metastore(
13981421
original_remote_fs,
1399-
"metastore",
14001422
i.get_service_typed().await,
14011423
);
14021424

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,7 +1294,7 @@ impl RocksMetaStore {
12941294
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());
12951295
let store = RocksStore::new(
12961296
store_path.clone().join(details.get_name()).as_path(),
1297-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
1297+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
12981298
config.config_obj(),
12991299
details,
13001300
)
@@ -4564,7 +4564,7 @@ mod tests {
45644564
{
45654565
let meta_store = RocksMetaStore::new(
45664566
store_path.join("metastore").as_path(),
4567-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
4567+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
45684568
config.config_obj(),
45694569
)
45704570
.unwrap();
@@ -4752,7 +4752,7 @@ mod tests {
47524752

47534753
let meta_store = RocksMetaStore::new(
47544754
store_path.join("metastore").as_path(),
4755-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
4755+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
47564756
config.config_obj(),
47574757
)
47584758
.unwrap();
@@ -4831,7 +4831,7 @@ mod tests {
48314831
{
48324832
let meta_store = RocksMetaStore::new(
48334833
store_path.join("metastore").as_path(),
4834-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
4834+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
48354835
config.config_obj(),
48364836
)
48374837
.unwrap();
@@ -4881,7 +4881,7 @@ mod tests {
48814881
{
48824882
let meta_store = RocksMetaStore::new(
48834883
store_path.clone().join("metastore").as_path(),
4884-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
4884+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
48854885
config.config_obj(),
48864886
)
48874887
.unwrap();
@@ -4989,7 +4989,7 @@ mod tests {
49894989
{
49904990
let meta_store = RocksMetaStore::new(
49914991
store_path.clone().join("metastore").as_path(),
4992-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
4992+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
49934993
config.config_obj(),
49944994
)
49954995
.unwrap();
@@ -5077,7 +5077,7 @@ mod tests {
50775077
{
50785078
let meta_store = RocksMetaStore::new(
50795079
store_path.clone().join("metastore").as_path(),
5080-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
5080+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
50815081
config.config_obj(),
50825082
)
50835083
.unwrap();
@@ -5901,7 +5901,7 @@ mod tests {
59015901
{
59025902
let meta_store = RocksMetaStore::new(
59035903
store_path.join("metastore").as_path(),
5904-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
5904+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
59055905
config.config_obj(),
59065906
)
59075907
.unwrap();
@@ -6039,7 +6039,7 @@ mod tests {
60396039
{
60406040
let meta_store = RocksMetaStore::new(
60416041
store_path.join("metastore").as_path(),
6042-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
6042+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
60436043
config.config_obj(),
60446044
)
60456045
.unwrap();

rust/cubestore/cubestore/src/metastore/rocks_fs.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,35 @@ pub trait MetaStoreFs: Send + Sync {
5555
pub struct BaseRocksStoreFs {
5656
remote_fs: Arc<dyn RemoteFs>,
5757
name: &'static str,
58-
config: Arc<dyn ConfigObj>,
58+
minimum_snapshots_count: u64,
59+
snapshots_lifetime: u64,
5960
}
6061

6162
impl BaseRocksStoreFs {
62-
pub fn new(
63+
pub fn new_for_metastore(
6364
remote_fs: Arc<dyn RemoteFs>,
64-
name: &'static str,
6565
config: Arc<dyn ConfigObj>,
6666
) -> Arc<Self> {
67+
let minimum_snapshots_count = config.minimum_metastore_snapshots_count();
68+
let snapshots_lifetime = config.metastore_snapshots_lifetime();
6769
Arc::new(Self {
6870
remote_fs,
69-
name,
70-
config,
71+
name: "metastore",
72+
minimum_snapshots_count,
73+
snapshots_lifetime,
74+
})
75+
}
76+
pub fn new_for_cachestore(
77+
remote_fs: Arc<dyn RemoteFs>,
78+
config: Arc<dyn ConfigObj>,
79+
) -> Arc<Self> {
80+
let minimum_snapshots_count = config.minimum_cachestore_snapshots_count();
81+
let snapshots_lifetime = config.cachestore_snapshots_lifetime();
82+
Arc::new(Self {
83+
remote_fs,
84+
name: "cachestore",
85+
minimum_snapshots_count,
86+
snapshots_lifetime,
7187
})
7288
}
7389

@@ -136,8 +152,8 @@ impl BaseRocksStoreFs {
136152
})
137153
.collect::<Vec<_>>();
138154

139-
let lifetime_ms = (self.config.metastore_snapshots_lifetime() as u128) * 1000;
140-
let min_snapshots_count = self.config.minimum_metastore_snapshots_count() as usize;
155+
let lifetime_ms = (self.snapshots_lifetime as u128) * 1000;
156+
let min_snapshots_count = self.minimum_snapshots_count as usize;
141157

142158
let mut snapshots_list = candidates
143159
.iter()

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1070,7 +1070,7 @@ mod tests {
10701070
let details = Arc::new(RocksMetaStoreDetails {});
10711071
let rocks_store = RocksStore::new(
10721072
store_path.join("metastore").as_path(),
1073-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
1073+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
10741074
config.config_obj(),
10751075
details,
10761076
)?;

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,13 +1834,13 @@ mod tests {
18341834
);
18351835
let meta_store = RocksMetaStore::new(
18361836
&Path::new(path).join("metastore"),
1837-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
1837+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
18381838
config.config_obj(),
18391839
)
18401840
.unwrap();
18411841
let cache_store = RocksCacheStore::new(
18421842
&Path::new(path).join("cachestore"),
1843-
BaseRocksStoreFs::new(remote_fs.clone(), "cachestore", config.config_obj()),
1843+
BaseRocksStoreFs::new_for_cachestore(remote_fs.clone(), config.config_obj()),
18441844
config.config_obj(),
18451845
)
18461846
.unwrap();
@@ -1906,13 +1906,13 @@ mod tests {
19061906
);
19071907
let meta_store = RocksMetaStore::new(
19081908
&Path::new(path).join("metastore"),
1909-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
1909+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
19101910
config.config_obj(),
19111911
)
19121912
.unwrap();
19131913
let cache_store = RocksCacheStore::new(
19141914
&Path::new(path).join("cachestore"),
1915-
BaseRocksStoreFs::new(remote_fs.clone(), "cachestore", config.config_obj()),
1915+
BaseRocksStoreFs::new_for_cachestore(remote_fs.clone(), config.config_obj()),
19161916
config.config_obj(),
19171917
)
19181918
.unwrap();
@@ -2007,13 +2007,13 @@ mod tests {
20072007
);
20082008
let meta_store = RocksMetaStore::new(
20092009
&Path::new(path).join("metastore"),
2010-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
2010+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
20112011
config.config_obj(),
20122012
)
20132013
.unwrap();
20142014
let cache_store = RocksCacheStore::new(
20152015
&Path::new(path).join("cachestore"),
2016-
BaseRocksStoreFs::new(remote_fs.clone(), "cachestore", config.config_obj()),
2016+
BaseRocksStoreFs::new_for_cachestore(remote_fs.clone(), config.config_obj()),
20172017
config.config_obj(),
20182018
)
20192019
.unwrap();

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1706,7 +1706,7 @@ mod tests {
17061706
);
17071707
let metastore = RocksMetaStore::new(
17081708
Path::new(path),
1709-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
1709+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
17101710
config.config_obj(),
17111711
)
17121712
.unwrap();

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@ mod tests {
777777
let store = WALStore::new(
778778
RocksMetaStore::new(
779779
Path::new(path),
780-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
780+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
781781
config.config_obj(),
782782
)
783783
.unwrap(),
@@ -869,7 +869,7 @@ mod tests {
869869
);
870870
let meta_store = RocksMetaStore::new(
871871
Path::new(path),
872-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
872+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
873873
config.config_obj(),
874874
)
875875
.unwrap();
@@ -968,7 +968,7 @@ mod tests {
968968
);
969969
let meta_store = RocksMetaStore::new(
970970
Path::new(path),
971-
BaseRocksStoreFs::new(remote_fs.clone(), "metastore", config.config_obj()),
971+
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
972972
config.config_obj(),
973973
)
974974
.unwrap();

0 commit comments

Comments
 (0)