Skip to content

Commit cc6003b

Browse files
authored
feat(cubestore): Max disk space limit (#6084)
1 parent 28ea47c commit cc6003b

File tree

3 files changed

+125
-0
lines changed

3 files changed

+125
-0
lines changed

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,8 @@ pub trait ConfigObj: DIService {
399399
fn minimum_metastore_snapshots_count(&self) -> u64;
400400

401401
fn metastore_snapshots_lifetime(&self) -> u64;
402+
403+
fn max_disk_space(&self) -> u64;
402404
}
403405

404406
#[derive(Debug, Clone)]
@@ -456,6 +458,7 @@ pub struct ConfigObjImpl {
456458
pub skip_kafka_parsing_errors: bool,
457459
pub minimum_metastore_snapshots_count: u64,
458460
pub metastore_snapshots_lifetime: u64,
461+
pub max_disk_space: u64,
459462
}
460463

461464
crate::di_service!(ConfigObjImpl, [ConfigObj]);
@@ -659,6 +662,10 @@ impl ConfigObj for ConfigObjImpl {
659662
fn metastore_snapshots_lifetime(&self) -> u64 {
660663
self.metastore_snapshots_lifetime
661664
}
665+
666+
fn max_disk_space(&self) -> u64 {
667+
self.max_disk_space
668+
}
662669
}
663670

664671
lazy_static! {
@@ -873,6 +880,7 @@ impl Config {
873880
"CUBESTORE_METASTORE_SNAPSHOTS_LIFETIME",
874881
24 * 60 * 60,
875882
),
883+
max_disk_space: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0) * 1024 * 1024 * 1024,
876884
}),
877885
}
878886
}
@@ -942,6 +950,7 @@ impl Config {
942950
skip_kafka_parsing_errors: false,
943951
minimum_metastore_snapshots_count: 3,
944952
metastore_snapshots_lifetime: 24 * 3600,
953+
max_disk_space: 0,
945954
}),
946955
}
947956
}

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,10 @@ pub trait MetaStore: DIService + Send + Sync {
908908
partition_id: u64,
909909
include_inactive: bool,
910910
) -> Result<Vec<IdRow<Chunk>>, CubeError>;
911+
async fn get_used_disk_space_out_of_queue(&self) -> Result<u64, CubeError>;
912+
async fn get_all_partitions_and_chunks_out_of_queue(
913+
&self,
914+
) -> Result<(Vec<IdRow<Partition>>, Vec<IdRow<Chunk>>), CubeError>;
911915
async fn get_chunks_by_partition_out_of_queue(
912916
&self,
913917
partition_id: u64,
@@ -2232,6 +2236,30 @@ impl MetaStore for RocksMetaStore {
22322236
.await
22332237
}
22342238

2239+
async fn get_used_disk_space_out_of_queue(&self) -> Result<u64, CubeError> {
2240+
let (partitions, chunks) = self.get_all_partitions_and_chunks_out_of_queue().await?;
2241+
let partitions_size: u64 = partitions
2242+
.into_iter()
2243+
.map(|p| p.get_row().file_size().unwrap_or(0))
2244+
.sum();
2245+
let chunks_size: u64 = chunks
2246+
.into_iter()
2247+
.map(|c| c.get_row().file_size().unwrap_or(0))
2248+
.sum();
2249+
Ok(partitions_size + chunks_size)
2250+
}
2251+
2252+
async fn get_all_partitions_and_chunks_out_of_queue(
2253+
&self,
2254+
) -> Result<(Vec<IdRow<Partition>>, Vec<IdRow<Chunk>>), CubeError> {
2255+
self.read_operation_out_of_queue(move |db| {
2256+
let partitions = PartitionRocksTable::new(db.clone()).all_rows()?;
2257+
let chunks = ChunkRocksTable::new(db).all_rows()?;
2258+
Ok((partitions, chunks))
2259+
})
2260+
.await
2261+
}
2262+
22352263
#[tracing::instrument(level = "trace", skip(self))]
22362264
async fn get_partition_chunk_sizes(&self, partition_id: u64) -> Result<u64, CubeError> {
22372265
let chunks = self

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,19 @@ impl SqlServiceImpl {
317317
} else {
318318
None
319319
};
320+
321+
let max_disk_space = self.config_obj.max_disk_space();
322+
if max_disk_space > 0 {
323+
let used_space = self.db.get_used_disk_space_out_of_queue().await?;
324+
if max_disk_space < used_space {
325+
return Err(CubeError::user(format!(
326+
"Exceeded available storage space: {:.3} GB out of {} GB allowed. Please consider changing pre-aggregations build range, reducing index count or pre-aggregations granularity.",
327+
used_space as f64 / 1024. / 1024. / 1024.,
328+
max_disk_space as f64 / 1024. / 1024. / 1024.
329+
)));
330+
}
331+
}
332+
320333
if !external {
321334
return self
322335
.db
@@ -3582,6 +3595,81 @@ mod tests {
35823595
.await;
35833596
}
35843597

3598+
#[tokio::test]
3599+
async fn disk_space_limit() {
3600+
Config::test("disk_space_limit")
3601+
.update_config(|mut c| {
3602+
c.partition_split_threshold = 1000000;
3603+
c.compaction_chunks_count_threshold = 100;
3604+
c.max_disk_space = 300_000;
3605+
c.select_workers = vec!["127.0.0.1:24306".to_string()];
3606+
c.metastore_bind_address = Some("127.0.0.1:25312".to_string());
3607+
c
3608+
})
3609+
.start_test(async move |services| {
3610+
let service = services.sql_service;
3611+
3612+
Config::test("disk_space_limit_worker_1")
3613+
.update_config(|mut c| {
3614+
c.worker_bind_address = Some("127.0.0.1:24306".to_string());
3615+
c.server_name = "127.0.0.1:24306".to_string();
3616+
c.metastore_remote_address = Some("127.0.0.1:25312".to_string());
3617+
c
3618+
})
3619+
.start_test_worker(async move |_| {
3620+
let paths = {
3621+
let dir = env::temp_dir();
3622+
3623+
let path_1 = dir.clone().join("foo-cluster-1.csv");
3624+
let path_2 = dir.clone().join("foo-cluster-2.csv.gz");
3625+
let mut file = File::create(path_1.clone()).unwrap();
3626+
3627+
file.write_all("id,city,arr,t\n".as_bytes()).unwrap();
3628+
for i in 0..100000
3629+
{
3630+
file.write_all(format!("{},\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23.123 UTC\n", i).as_bytes()).unwrap();
3631+
}
3632+
3633+
3634+
let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap()));
3635+
3636+
file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap();
3637+
for i in 0..100000
3638+
{
3639+
file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await.unwrap();
3640+
}
3641+
3642+
file.shutdown().await.unwrap();
3643+
3644+
vec![path_1, path_2]
3645+
};
3646+
3647+
let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap();
3648+
let _ = service.exec_query(
3649+
&format!(
3650+
"CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}",
3651+
paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",")
3652+
)
3653+
).await.unwrap();
3654+
3655+
let res = service.exec_query(
3656+
&format!(
3657+
"CREATE TABLE Foo.Persons2 (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}",
3658+
paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",")
3659+
)
3660+
).await;
3661+
if let Err(err) = res {
3662+
assert!(err.message.starts_with("Exceeded available storage "));
3663+
} else {
3664+
assert!(false);
3665+
}
3666+
3667+
})
3668+
.await;
3669+
})
3670+
.await;
3671+
}
3672+
35853673
#[tokio::test]
35863674
async fn compaction() {
35873675
Config::test("compaction").update_config(|mut config| {

0 commit comments

Comments
 (0)