Skip to content

Commit 16ec962

Browse files
authored
chore(cubestore): Introduce SYS CACHESTORE|METASTORE HEALTHCHECK (#6205)
1 parent fb896ba commit 16ec962

File tree

8 files changed

+86
-19
lines changed

8 files changed

+86
-19
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
250250
),
251251
t("limit_pushdown_unique_key", limit_pushdown_unique_key),
252252
t("sys_drop_cache", sys_drop_cache),
253+
t("sys_metastore_healthcheck", sys_metastore_healthcheck),
254+
t("sys_cachestore_healthcheck", sys_cachestore_healthcheck),
253255
];
254256

255257
fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
@@ -8423,6 +8425,20 @@ async fn sys_drop_cache(service: Box<dyn SqlClient>) {
84238425
service.exec_query(r#"SYS DROP CACHE;"#).await.unwrap();
84248426
}
84258427

8428+
async fn sys_metastore_healthcheck(service: Box<dyn SqlClient>) {
8429+
service
8430+
.exec_query(r#"SYS METASTORE HEALTHCHECK;"#)
8431+
.await
8432+
.unwrap();
8433+
}
8434+
8435+
async fn sys_cachestore_healthcheck(service: Box<dyn SqlClient>) {
8436+
service
8437+
.exec_query(r#"SYS CACHESTORE HEALTHCHECK;"#)
8438+
.await
8439+
.unwrap();
8440+
}
8441+
84268442
pub fn to_rows(d: &DataFrame) -> Vec<Vec<TableValue>> {
84278443
return d
84288444
.get_rows()

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ pub trait CacheStore: DIService + Send + Sync {
398398

399399
// Force compaction for the whole RocksDB
400400
async fn compaction(&self) -> Result<(), CubeError>;
401+
async fn healthcheck(&self) -> Result<(), CubeError>;
401402
}
402403

403404
#[async_trait]
@@ -865,6 +866,17 @@ impl CacheStore for RocksCacheStore {
865866

866867
Ok(())
867868
}
869+
870+
async fn healthcheck(&self) -> Result<(), CubeError> {
871+
self.store
872+
.read_operation(move |_| {
873+
// read_operation will call getSnapshot, which is enough to test that RocksDB works
874+
Ok(())
875+
})
876+
.await?;
877+
878+
Ok(())
879+
}
868880
}
869881

870882
crate::di_service!(RocksCacheStore, [CacheStore]);
@@ -979,6 +991,10 @@ impl CacheStore for ClusterCacheStoreClient {
979991
async fn compaction(&self) -> Result<(), CubeError> {
980992
panic!("CacheStore cannot be used on the worker node! compaction was used.")
981993
}
994+
995+
async fn healthcheck(&self) -> Result<(), CubeError> {
996+
panic!("CacheStore cannot be used on the worker node! healthcheck was used.")
997+
}
982998
}
983999

9841000
crate::di_service!(ClusterCacheStoreClient, [CacheStore]);

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ impl CacheStore for LazyRocksCacheStore {
282282
async fn compaction(&self) -> Result<(), CubeError> {
283283
self.init().await?.compaction().await
284284
}
285+
286+
async fn healthcheck(&self) -> Result<(), CubeError> {
287+
self.init().await?.healthcheck().await
288+
}
285289
}
286290

287291
crate::di_service!(LazyRocksCacheStore, [CacheStore]);

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,6 +1065,7 @@ pub trait MetaStore: DIService + Send + Sync {
10651065
async fn debug_dump(&self, out_path: String) -> Result<(), CubeError>;
10661066
// Force compaction for the whole RocksDB
10671067
async fn compaction(&self) -> Result<(), CubeError>;
1068+
async fn healthcheck(&self) -> Result<(), CubeError>;
10681069

10691070
async fn get_snapshots_list(&self) -> Result<Vec<SnapshotInfo>, CubeError>;
10701071
async fn set_current_snapshot(&self, snapshot_id: u128) -> Result<(), CubeError>;
@@ -3867,6 +3868,16 @@ impl MetaStore for RocksMetaStore {
38673868
Ok(())
38683869
}
38693870

3871+
async fn healthcheck(&self) -> Result<(), CubeError> {
3872+
self.read_operation(move |_| {
3873+
// read_operation will call getSnapshot, which is enough to test that RocksDB works
3874+
Ok(())
3875+
})
3876+
.await?;
3877+
3878+
Ok(())
3879+
}
3880+
38703881
#[tracing::instrument(level = "trace", skip(self))]
38713882
async fn get_multi_partition(&self, id: u64) -> Result<IdRow<MultiPartition>, CubeError> {
38723883
self.read_operation(move |db| MultiPartitionRocksTable::new(db).get_row_or_not_found(id))

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,10 @@ impl MetaStore for MetaStoreMock {
651651
panic!("MetaStore mock!")
652652
}
653653

654+
async fn healthcheck(&self) -> Result<(), CubeError> {
655+
panic!("MetaStore mock!")
656+
}
657+
654658
async fn get_snapshots_list(&self) -> Result<Vec<SnapshotInfo>, CubeError> {
655659
panic!("MetaStore mock!")
656660
}
@@ -795,6 +799,10 @@ impl CacheStore for CacheStoreMock {
795799
async fn compaction(&self) -> Result<(), CubeError> {
796800
panic!("CacheStore mock!")
797801
}
802+
803+
async fn healthcheck(&self) -> Result<(), CubeError> {
804+
panic!("CacheStore mock!")
805+
}
798806
}
799807

800808
crate::di_service!(CacheStoreMock, [CacheStore]);

rust/cubestore/cubestore/src/sql/cachestore.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ impl CacheStoreSqlService {
4343
self.cachestore.compaction().await?;
4444
Ok(Arc::new(DataFrame::new(vec![], vec![])))
4545
}
46+
CacheStoreCommand::Healthcheck => {
47+
self.cachestore.healthcheck().await?;
48+
Ok(Arc::new(DataFrame::new(vec![], vec![])))
49+
}
4650
}
4751
}
4852

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,10 @@ impl SqlService for SqlServiceImpl {
895895
self.db.compaction().await?;
896896
Ok(Arc::new(DataFrame::new(vec![], vec![])))
897897
}
898+
MetaStoreCommand::Healthcheck => {
899+
self.db.healthcheck().await?;
900+
Ok(Arc::new(DataFrame::new(vec![], vec![])))
901+
}
898902
},
899903
SystemCommand::CacheStore(command) => {
900904
self.cachestore

rust/cubestore/cubestore/src/sql/parser.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,13 @@ pub enum DropCommand {
160160
pub enum MetaStoreCommand {
161161
SetCurrent { id: u128 },
162162
Compaction,
163+
Healthcheck,
163164
}
164165

165166
#[derive(Debug, Clone, PartialEq)]
166167
pub enum CacheStoreCommand {
167168
Compaction,
169+
Healthcheck,
168170
}
169171

170172
pub struct CubeStoreParser<'a> {
@@ -346,33 +348,35 @@ impl<'a> CubeStoreParser<'a> {
346348
}
347349

348350
pub fn parse_cachestore(&mut self) -> Result<Statement, ParserError> {
349-
if self.parse_custom_token("compaction") {
350-
Ok(Statement::System(SystemCommand::CacheStore(
351-
CacheStoreCommand::Compaction,
352-
)))
351+
let command = if self.parse_custom_token("compaction") {
352+
CacheStoreCommand::Compaction
353+
} else if self.parse_custom_token("healthcheck") {
354+
CacheStoreCommand::Healthcheck
353355
} else {
354-
Err(ParserError::ParserError(
356+
return Err(ParserError::ParserError(
355357
"Unknown cachestore command".to_string(),
356-
))
357-
}
358+
));
359+
};
360+
361+
Ok(Statement::System(SystemCommand::CacheStore(command)))
358362
}
359363

360364
pub fn parse_metastore(&mut self) -> Result<Statement, ParserError> {
361-
if self.parse_custom_token("set_current") {
362-
Ok(Statement::System(SystemCommand::MetaStore(
363-
MetaStoreCommand::SetCurrent {
364-
id: self.parse_integer("metastore snapshot id", false)?,
365-
},
366-
)))
365+
let command = if self.parse_custom_token("set_current") {
366+
MetaStoreCommand::SetCurrent {
367+
id: self.parse_integer("metastore snapshot id", false)?,
368+
}
367369
} else if self.parse_custom_token("compaction") {
368-
Ok(Statement::System(SystemCommand::MetaStore(
369-
MetaStoreCommand::Compaction,
370-
)))
370+
MetaStoreCommand::Compaction
371+
} else if self.parse_custom_token("healthcheck") {
372+
MetaStoreCommand::Healthcheck
371373
} else {
372-
Err(ParserError::ParserError(
374+
return Err(ParserError::ParserError(
373375
"Unknown metastore command".to_string(),
374-
))
375-
}
376+
));
377+
};
378+
379+
Ok(Statement::System(SystemCommand::MetaStore(command)))
376380
}
377381

378382
fn parse_queue(&mut self) -> Result<Statement, ParserError> {

0 commit comments

Comments
 (0)