Skip to content

Commit c60b756

Browse files
committed
chore(cubestore): Set metastore deletion batch size using CUBESTORE_SNAPSHOTS_DELETION_BATCH_SIZE
1 parent 3bc4093 commit c60b756

File tree

2 files changed

+22
-8
lines changed

2 files changed

+22
-8
lines changed

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,8 @@ pub trait ConfigObj: DIService {
518518

519519
fn dump_dir(&self) -> &Option<PathBuf>;
520520

521+
fn snapshots_deletion_batch_size(&self) -> u64;
522+
521523
fn minimum_metastore_snapshots_count(&self) -> u64;
522524

523525
fn metastore_snapshots_lifetime(&self) -> u64;
@@ -630,6 +632,7 @@ pub struct ConfigObjImpl {
630632
pub drop_ws_processing_messages_after_secs: u64,
631633
pub drop_ws_complete_messages_after_secs: u64,
632634
pub skip_kafka_parsing_errors: bool,
635+
pub snapshots_deletion_batch_size: u64,
633636
pub minimum_metastore_snapshots_count: u64,
634637
pub metastore_snapshots_lifetime: u64,
635638
pub minimum_cachestore_snapshots_count: u64,
@@ -953,6 +956,10 @@ impl ConfigObj for ConfigObjImpl {
953956
&self.dump_dir
954957
}
955958

959+
fn snapshots_deletion_batch_size(&self) -> u64 {
960+
self.snapshots_deletion_batch_size
961+
}
962+
956963
fn minimum_metastore_snapshots_count(&self) -> u64 {
957964
self.minimum_metastore_snapshots_count
958965
}
@@ -1486,6 +1493,11 @@ impl Config {
14861493
10 * 60,
14871494
),
14881495
skip_kafka_parsing_errors: env_parse("CUBESTORE_SKIP_KAFKA_PARSING_ERRORS", false),
1496+
// Presently, not useful to make more than upload_concurrency times constant
1497+
snapshots_deletion_batch_size: env_parse(
1498+
"CUBESTORE_SNAPSHOTS_DELETION_BATCH_SIZE",
1499+
80,
1500+
),
14891501
minimum_metastore_snapshots_count: env_parse(
14901502
"CUBESTORE_MINIMUM_METASTORE_SNAPSHOTS_COUNT",
14911503
5,
@@ -1652,6 +1664,7 @@ impl Config {
16521664
drop_ws_processing_messages_after_secs: 60,
16531665
drop_ws_complete_messages_after_secs: 10,
16541666
skip_kafka_parsing_errors: false,
1667+
snapshots_deletion_batch_size: 80,
16551668
minimum_metastore_snapshots_count: 3,
16561669
metastore_snapshots_lifetime: 24 * 3600,
16571670
minimum_cachestore_snapshots_count: 3,

rust/cubestore/cubestore/src/metastore/rocks_fs.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ pub struct BaseRocksStoreFs {
5555
name: &'static str,
5656
minimum_snapshots_count: u64,
5757
snapshots_lifetime: u64,
58-
remote_files_cleanup_batch_size: u64,
58+
// A copy of the upload-concurrency config -- we multiply this for our deletes.
59+
snapshots_deletion_batch_size: u64,
5960
}
6061

6162
impl BaseRocksStoreFs {
@@ -65,13 +66,13 @@ impl BaseRocksStoreFs {
6566
) -> Arc<Self> {
6667
let minimum_snapshots_count = config.minimum_metastore_snapshots_count();
6768
let snapshots_lifetime = config.metastore_snapshots_lifetime();
68-
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
69+
let snapshots_deletion_batch_size = config.snapshots_deletion_batch_size();
6970
Arc::new(Self {
7071
remote_fs,
7172
name: "metastore",
7273
minimum_snapshots_count,
7374
snapshots_lifetime,
74-
remote_files_cleanup_batch_size,
75+
snapshots_deletion_batch_size,
7576
})
7677
}
7778
pub fn new_for_cachestore(
@@ -80,13 +81,13 @@ impl BaseRocksStoreFs {
8081
) -> Arc<Self> {
8182
let minimum_snapshots_count = config.minimum_cachestore_snapshots_count();
8283
let snapshots_lifetime = config.cachestore_snapshots_lifetime();
83-
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
84+
let snapshots_deletion_batch_size = config.snapshots_deletion_batch_size();
8485
Arc::new(Self {
8586
remote_fs,
8687
name: "cachestore",
8788
minimum_snapshots_count,
8889
snapshots_lifetime,
89-
remote_files_cleanup_batch_size,
90+
snapshots_deletion_batch_size,
9091
})
9192
}
9293

@@ -145,8 +146,8 @@ impl BaseRocksStoreFs {
145146
name: &str,
146147
) -> Result<HashMap<u128, Vec<String>>, CubeError> {
147148
let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?;
148-
// Log a debug statement so that we can rule out the filename list itself being too large for memory.
149-
log::debug!(
149+
// Log an info statement so that we can rule out the filename list itself being too large for memory.
150+
log::info!(
150151
"Listed existing {} files, count = {}",
151152
name,
152153
existing_metastore_files.len()
@@ -215,7 +216,7 @@ impl BaseRocksStoreFs {
215216
}
216217

217218
for batch in to_delete.chunks(
218-
self.remote_files_cleanup_batch_size
219+
self.snapshots_deletion_batch_size
219220
.try_into()
220221
.unwrap_or(usize::MAX),
221222
) {

0 commit comments

Comments
 (0)