Skip to content

Commit ed2ca79

Browse files
authored
feat(cubestore): Max disk space limit per worker (#6085)
1 parent cc6003b commit ed2ca79

File tree

5 files changed

+227
-28
lines changed

5 files changed

+227
-28
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -442,11 +442,7 @@ impl Cluster for ClusterImpl {
442442
}
443443

444444
fn node_name_by_partition(&self, p: &IdRow<Partition>) -> String {
445-
if let Some(id) = p.get_row().multi_partition_id() {
446-
pick_worker_by_ids(self.config_obj.as_ref(), [id]).to_string()
447-
} else {
448-
pick_worker_by_partitions(self.config_obj.as_ref(), [p]).to_string()
449-
}
445+
node_name_by_partition(self.config_obj.as_ref(), p)
450446
}
451447

452448
async fn node_name_for_chunk_repartition(
@@ -1749,6 +1745,13 @@ fn is_self_reference(name: &str) -> bool {
17491745
name.starts_with("@loop:")
17501746
}
17511747

1748+
pub fn node_name_by_partition<'a>(config: &'a dyn ConfigObj, p: &IdRow<Partition>) -> String {
1749+
if let Some(id) = p.get_row().multi_partition_id() {
1750+
pick_worker_by_ids(config, [id]).to_string()
1751+
} else {
1752+
pick_worker_by_partitions(config, [p]).to_string()
1753+
}
1754+
}
17521755
/// Picks a worker by opaque id for any distributing work in a cluster.
17531756
/// Ids usually come from multi-partitions of the metastore.
17541757
pub fn pick_worker_by_ids<'a>(

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,9 @@ pub trait ConfigObj: DIService {
401401
fn metastore_snapshots_lifetime(&self) -> u64;
402402

403403
fn max_disk_space(&self) -> u64;
404+
fn max_disk_space_per_worker(&self) -> u64;
405+
406+
fn disk_space_cache_duration_secs(&self) -> u64;
404407
}
405408

406409
#[derive(Debug, Clone)]
@@ -459,6 +462,8 @@ pub struct ConfigObjImpl {
459462
pub minimum_metastore_snapshots_count: u64,
460463
pub metastore_snapshots_lifetime: u64,
461464
pub max_disk_space: u64,
465+
pub max_disk_space_per_worker: u64,
466+
pub disk_space_cache_duration_secs: u64,
462467
}
463468

464469
crate::di_service!(ConfigObjImpl, [ConfigObj]);
@@ -666,6 +671,14 @@ impl ConfigObj for ConfigObjImpl {
666671
fn max_disk_space(&self) -> u64 {
667672
self.max_disk_space
668673
}
674+
675+
fn max_disk_space_per_worker(&self) -> u64 {
676+
self.max_disk_space_per_worker
677+
}
678+
679+
fn disk_space_cache_duration_secs(&self) -> u64 {
680+
self.disk_space_cache_duration_secs
681+
}
669682
}
670683

671684
lazy_static! {
@@ -881,6 +894,11 @@ impl Config {
881894
24 * 60 * 60,
882895
),
883896
max_disk_space: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0) * 1024 * 1024 * 1024,
897+
max_disk_space_per_worker: env_parse("CUBESTORE_MAX_DISK_SPACE_PER_WORKER_GB", 0)
898+
* 1024
899+
* 1024
900+
* 1024,
901+
disk_space_cache_duration_secs: 300,
884902
}),
885903
}
886904
}
@@ -951,6 +969,8 @@ impl Config {
951969
minimum_metastore_snapshots_count: 3,
952970
metastore_snapshots_lifetime: 24 * 3600,
953971
max_disk_space: 0,
972+
max_disk_space_per_worker: 0,
973+
disk_space_cache_duration_secs: 0,
954974
}),
955975
}
956976
}

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use rocks_fs::*;
1818
pub use rocks_store::*;
1919
pub use rocks_table::*;
2020

21+
use crate::cluster::node_name_by_partition;
2122
use async_trait::async_trait;
2223
use log::info;
2324
use rocksdb::{MergeOperands, Options, DB};
@@ -81,10 +82,11 @@ use std::str::FromStr;
8182
use crate::cachestore::{CacheItem, QueueItem, QueueItemStatus, QueueResult, QueueResultAckEvent};
8283
use crate::remotefs::LocalDirRemoteFs;
8384
use snapshot_info::SnapshotInfo;
84-
use std::time::Duration;
85+
use std::time::{Duration, SystemTime};
8586
use table::Table;
8687
use table::{TableRocksIndex, TableRocksTable};
8788
use tokio::sync::broadcast::Sender;
89+
use tokio::sync::RwLock;
8890
use wal::WALRocksTable;
8991

9092
#[macro_export]
@@ -908,7 +910,10 @@ pub trait MetaStore: DIService + Send + Sync {
908910
partition_id: u64,
909911
include_inactive: bool,
910912
) -> Result<Vec<IdRow<Chunk>>, CubeError>;
911-
async fn get_used_disk_space_out_of_queue(&self) -> Result<u64, CubeError>;
913+
async fn get_used_disk_space_out_of_queue(
914+
&self,
915+
node: Option<String>,
916+
) -> Result<u64, CubeError>;
912917
async fn get_all_partitions_and_chunks_out_of_queue(
913918
&self,
914919
) -> Result<(Vec<IdRow<Partition>>, Vec<IdRow<Chunk>>), CubeError>;
@@ -1134,6 +1139,7 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
11341139

11351140
pub struct RocksMetaStore {
11361141
store: Arc<RocksStore>,
1142+
disk_space_cache: Arc<RwLock<Option<(HashMap<String, u64>, SystemTime)>>>,
11371143
upload_loop: Arc<WorkerLoop>,
11381144
}
11391145

@@ -1155,6 +1161,7 @@ impl RocksMetaStore {
11551161
fn new_from_store(store: Arc<RocksStore>) -> Arc<Self> {
11561162
Arc::new(Self {
11571163
store,
1164+
disk_space_cache: Arc::new(RwLock::new(None)),
11581165
upload_loop: Arc::new(WorkerLoop::new("Metastore upload")),
11591166
})
11601167
}
@@ -2236,17 +2243,73 @@ impl MetaStore for RocksMetaStore {
22362243
.await
22372244
}
22382245

2239-
async fn get_used_disk_space_out_of_queue(&self) -> Result<u64, CubeError> {
2240-
let (partitions, chunks) = self.get_all_partitions_and_chunks_out_of_queue().await?;
2241-
let partitions_size: u64 = partitions
2242-
.into_iter()
2243-
.map(|p| p.get_row().file_size().unwrap_or(0))
2244-
.sum();
2245-
let chunks_size: u64 = chunks
2246-
.into_iter()
2247-
.map(|c| c.get_row().file_size().unwrap_or(0))
2248-
.sum();
2249-
Ok(partitions_size + chunks_size)
2246+
async fn get_used_disk_space_out_of_queue(
2247+
&self,
2248+
node: Option<String>,
2249+
) -> Result<u64, CubeError> {
2250+
let cached = if let Some((sizes, time)) = self.disk_space_cache.read().await.as_ref() {
2251+
let cache_duration =
2252+
Duration::from_secs(self.store.config.disk_space_cache_duration_secs());
2253+
if time.elapsed()? < cache_duration {
2254+
Some(sizes.clone())
2255+
} else {
2256+
None
2257+
}
2258+
} else {
2259+
None
2260+
};
2261+
let sizes_map = if let Some(sizes) = cached {
2262+
sizes
2263+
} else {
2264+
let (partitions, chunks) = self.get_all_partitions_and_chunks_out_of_queue().await?;
2265+
let mut partitions_map = partitions
2266+
.into_iter()
2267+
.map(|p| {
2268+
(
2269+
p.get_id(),
2270+
(
2271+
p.get_row().file_size().unwrap_or(0),
2272+
node_name_by_partition(self.store.config.as_ref(), &p),
2273+
),
2274+
)
2275+
})
2276+
.collect::<HashMap<u64, (u64, String)>>();
2277+
for c in chunks.into_iter() {
2278+
if let Some((ref mut size, _)) =
2279+
partitions_map.get_mut(&c.get_row().get_partition_id())
2280+
{
2281+
*size = c.get_row().file_size().unwrap_or(0);
2282+
}
2283+
}
2284+
2285+
let workers = if self.store.config.select_workers().is_empty() {
2286+
vec![self.store.config.server_name().clone()]
2287+
} else {
2288+
self.store.config.select_workers().clone()
2289+
};
2290+
2291+
let mut map = workers
2292+
.into_iter()
2293+
.map(|n| (n, 0))
2294+
.collect::<HashMap<String, u64>>();
2295+
2296+
for (_, (size, node)) in partitions_map.into_iter() {
2297+
map.entry(node).and_modify(|s| *s += size).or_insert(0);
2298+
}
2299+
2300+
let mut cache = self.disk_space_cache.write().await;
2301+
*cache = Some((map.clone(), SystemTime::now()));
2302+
2303+
map
2304+
};
2305+
2306+
let res = if let Some(node_name) = node {
2307+
sizes_map.get(&node_name).unwrap_or(&0).clone()
2308+
} else {
2309+
sizes_map.values().sum::<u64>()
2310+
};
2311+
2312+
Ok(res)
22502313
}
22512314

22522315
async fn get_all_partitions_and_chunks_out_of_queue(

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ impl SqlServiceImpl {
320320

321321
let max_disk_space = self.config_obj.max_disk_space();
322322
if max_disk_space > 0 {
323-
let used_space = self.db.get_used_disk_space_out_of_queue().await?;
323+
let used_space = self.db.get_used_disk_space_out_of_queue(None).await?;
324324
if max_disk_space < used_space {
325325
return Err(CubeError::user(format!(
326326
"Exceeded available storage space: {:.3} GB out of {} GB allowed. Please consider changing pre-aggregations build range, reducing index count or pre-aggregations granularity.",
@@ -3602,18 +3602,105 @@ mod tests {
36023602
c.partition_split_threshold = 1000000;
36033603
c.compaction_chunks_count_threshold = 100;
36043604
c.max_disk_space = 300_000;
3605-
c.select_workers = vec!["127.0.0.1:24306".to_string()];
3606-
c.metastore_bind_address = Some("127.0.0.1:25312".to_string());
3605+
c.select_workers = vec!["127.0.0.1:24308".to_string()];
3606+
c.metastore_bind_address = Some("127.0.0.1:25314".to_string());
36073607
c
36083608
})
36093609
.start_test(async move |services| {
36103610
let service = services.sql_service;
36113611

36123612
Config::test("disk_space_limit_worker_1")
36133613
.update_config(|mut c| {
3614-
c.worker_bind_address = Some("127.0.0.1:24306".to_string());
3615-
c.server_name = "127.0.0.1:24306".to_string();
3616-
c.metastore_remote_address = Some("127.0.0.1:25312".to_string());
3614+
c.worker_bind_address = Some("127.0.0.1:24308".to_string());
3615+
c.server_name = "127.0.0.1:24308".to_string();
3616+
c.max_disk_space = 300_000;
3617+
c.metastore_remote_address = Some("127.0.0.1:25314".to_string());
3618+
c.store_provider = FileStoreProvider::Filesystem {
3619+
remote_dir: Some(env::current_dir()
3620+
.unwrap()
3621+
.join("disk_space_limit-upstream")),
3622+
};
3623+
c
3624+
})
3625+
.start_test_worker(async move |_| {
3626+
let paths = {
3627+
let dir = env::temp_dir();
3628+
3629+
let path_1 = dir.clone().join("foo-cluster-1.csv");
3630+
let path_2 = dir.clone().join("foo-cluster-2.csv.gz");
3631+
let mut file = File::create(path_1.clone()).unwrap();
3632+
3633+
file.write_all("id,city,arr,t\n".as_bytes()).unwrap();
3634+
for i in 0..100000
3635+
{
3636+
file.write_all(format!("{},\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23.123 UTC\n", i).as_bytes()).unwrap();
3637+
}
3638+
3639+
3640+
let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap()));
3641+
3642+
file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap();
3643+
for i in 0..100000
3644+
{
3645+
file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await.unwrap();
3646+
}
3647+
3648+
file.shutdown().await.unwrap();
3649+
3650+
vec![path_1, path_2]
3651+
};
3652+
3653+
let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap();
3654+
let _ = service.exec_query(
3655+
&format!(
3656+
"CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}",
3657+
paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",")
3658+
)
3659+
).await.unwrap();
3660+
3661+
let res = service.exec_query(
3662+
&format!(
3663+
"CREATE TABLE Foo.Persons2 (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}",
3664+
paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",")
3665+
)
3666+
).await;
3667+
if let Err(err) = res {
3668+
assert!(err.message.starts_with("Exceeded available storage space:"));
3669+
} else {
3670+
assert!(false);
3671+
}
3672+
3673+
})
3674+
.await;
3675+
})
3676+
.await;
3677+
}
3678+
3679+
#[tokio::test]
3680+
async fn disk_space_limit_per_worker() {
3681+
Config::test("disk_space_limit_per_worker")
3682+
.update_config(|mut c| {
3683+
c.partition_split_threshold = 1000000;
3684+
c.compaction_chunks_count_threshold = 100;
3685+
c.max_disk_space_per_worker = 6_000_000;
3686+
c.select_workers = vec!["127.0.0.1:24309".to_string()];
3687+
c.metastore_bind_address = Some("127.0.0.1:25315".to_string());
3688+
c
3689+
})
3690+
.start_test(async move |services| {
3691+
let service = services.sql_service;
3692+
3693+
Config::test("disk_space_limit_per_worker_worker_1")
3694+
.update_config(|mut c| {
3695+
c.worker_bind_address = Some("127.0.0.1:24309".to_string());
3696+
c.server_name = "127.0.0.1:24309".to_string();
3697+
c.max_disk_space_per_worker = 6_000_000;
3698+
c.metastore_remote_address = Some("127.0.0.1:25315".to_string());
3699+
c.store_provider = FileStoreProvider::Filesystem {
3700+
remote_dir: Some(env::current_dir()
3701+
.unwrap()
3702+
.join("disk_space_limit_per_worker-upstream")),
3703+
};
36173704
c
36183705
})
36193706
.start_test_worker(async move |_| {
@@ -3659,7 +3746,7 @@ mod tests {
36593746
)
36603747
).await;
36613748
if let Err(err) = res {
3662-
assert!(err.message.starts_with("Exceeded available storage "));
3749+
assert!(err.message.contains("Exceeded available storage space on worker"));
36633750
} else {
36643751
assert!(false);
36653752
}

0 commit comments

Comments
 (0)