Skip to content

Commit 4616d57

Browse files
authored
chore(cubestore): Introduce cachestore/metastore.rocksdb_properties tables (#7055)
1 parent 7714ca9 commit 4616d57

File tree

9 files changed

+189
-9
lines changed

9 files changed

+189
-9
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
6565
t("column_escaping", column_escaping),
6666
t("information_schema", information_schema),
6767
t("system_query_cache", system_query_cache),
68+
t("metastore_rocksdb_tables", metastore_rocksdb_tables),
69+
t("cachestore_rocksdb_tables", cachestore_rocksdb_tables),
6870
t("case_column_escaping", case_column_escaping),
6971
t("inner_column_escaping", inner_column_escaping),
7072
t("convert_tz", convert_tz),
@@ -1395,9 +1397,18 @@ async fn system_query_cache(service: Box<dyn SqlClient>) {
13951397
.exec_query("SELECT * FROM system.query_cache")
13961398
.await
13971399
.unwrap();
1400+
}
1401+
1402+
async fn metastore_rocksdb_tables(service: Box<dyn SqlClient>) {
1403+
service
1404+
.exec_query("SELECT * FROM metastore.rocksdb_properties")
1405+
.await
1406+
.unwrap();
1407+
}
13981408

1409+
async fn cachestore_rocksdb_tables(service: Box<dyn SqlClient>) {
13991410
service
1400-
.exec_query("SELECT sql FROM system.query_cache;")
1411+
.exec_query("SELECT * FROM cachestore.rocksdb_properties")
14011412
.await
14021413
.unwrap();
14031414
}

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use std::collections::HashMap;
1313
use std::env;
1414

1515
use crate::metastore::{
16-
BaseRocksStoreFs, BatchPipe, DbTableRef, IdRow, MetaStoreEvent, MetaStoreFs, RocksStore,
17-
RocksStoreDetails, RocksTable,
16+
BaseRocksStoreFs, BatchPipe, DbTableRef, IdRow, MetaStoreEvent, MetaStoreFs, RocksPropertyRow,
17+
RocksStore, RocksStoreDetails, RocksTable,
1818
};
1919
use crate::remotefs::LocalDirRemoteFs;
2020
use crate::util::WorkerLoop;
@@ -536,6 +536,7 @@ pub trait CacheStore: DIService + Send + Sync {
536536
// Force compaction for the whole RocksDB
537537
async fn compaction(&self) -> Result<(), CubeError>;
538538
async fn healthcheck(&self) -> Result<(), CubeError>;
539+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError>;
539540
}
540541

541542
#[async_trait]
@@ -1038,6 +1039,10 @@ impl CacheStore for RocksCacheStore {
10381039

10391040
Ok(())
10401041
}
1042+
1043+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
1044+
self.store.rocksdb_properties()
1045+
}
10411046
}
10421047

10431048
crate::di_service!(RocksCacheStore, [CacheStore]);
@@ -1170,6 +1175,10 @@ impl CacheStore for ClusterCacheStoreClient {
11701175
async fn healthcheck(&self) -> Result<(), CubeError> {
11711176
panic!("CacheStore cannot be used on the worker node! healthcheck was used.")
11721177
}
1178+
1179+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
1180+
panic!("CacheStore cannot be used on the worker node! rocksdb_properties was used.")
1181+
}
11731182
}
11741183

11751184
crate::di_service!(ClusterCacheStoreClient, [CacheStore]);

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::cachestore::{
55
RocksCacheStore,
66
};
77
use crate::config::ConfigObj;
8-
use crate::metastore::{IdRow, MetaStoreEvent, MetaStoreFs};
8+
use crate::metastore::{IdRow, MetaStoreEvent, MetaStoreFs, RocksPropertyRow};
99
use crate::CubeError;
1010
use async_trait::async_trait;
1111
use log::trace;
@@ -302,6 +302,10 @@ impl CacheStore for LazyRocksCacheStore {
302302
async fn healthcheck(&self) -> Result<(), CubeError> {
303303
self.init().await?.healthcheck().await
304304
}
305+
306+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
307+
self.init().await?.rocksdb_properties().await
308+
}
305309
}
306310

307311
crate::di_service!(LazyRocksCacheStore, [CacheStore]);

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,7 @@ pub trait MetaStore: DIService + Send + Sync {
11261126
// Force compaction for the whole RocksDB
11271127
async fn compaction(&self) -> Result<(), CubeError>;
11281128
async fn healthcheck(&self) -> Result<(), CubeError>;
1129+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError>;
11291130

11301131
async fn get_snapshots_list(&self) -> Result<Vec<SnapshotInfo>, CubeError>;
11311132
async fn set_current_snapshot(&self, snapshot_id: u128) -> Result<(), CubeError>;
@@ -4203,6 +4204,10 @@ impl MetaStore for RocksMetaStore {
42034204
Ok(())
42044205
}
42054206

4207+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
4208+
self.store.rocksdb_properties()
4209+
}
4210+
42064211
async fn healthcheck(&self) -> Result<(), CubeError> {
42074212
self.store.healthcheck().await?;
42084213

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ pub struct RocksSecondaryIndexValueTTLExtended {
139139
pub raw_size: u32,
140140
}
141141

142+
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
143+
pub struct RocksPropertyRow {
144+
pub key: String,
145+
pub value: Option<String>,
146+
}
147+
142148
#[derive(Debug)]
143149
pub enum RocksSecondaryIndexValue<'a> {
144150
Hash(&'a [u8]),
@@ -991,6 +997,55 @@ impl RocksStore {
991997
Ok(())
992998
}
993999

1000+
pub fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
1001+
let to_collect = [
1002+
rocksdb::properties::BLOCK_CACHE_CAPACITY,
1003+
rocksdb::properties::BLOCK_CACHE_USAGE,
1004+
rocksdb::properties::BLOCK_CACHE_PINNED_USAGE,
1005+
rocksdb::properties::LEVELSTATS,
1006+
&rocksdb::properties::compression_ratio_at_level(0),
1007+
&rocksdb::properties::compression_ratio_at_level(1),
1008+
&rocksdb::properties::compression_ratio_at_level(2),
1009+
&rocksdb::properties::compression_ratio_at_level(3),
1010+
&rocksdb::properties::compression_ratio_at_level(4),
1011+
&rocksdb::properties::compression_ratio_at_level(6),
1012+
rocksdb::properties::DBSTATS,
1013+
// rocksdb::properties::SSTABLES,
1014+
rocksdb::properties::NUM_RUNNING_FLUSHES,
1015+
rocksdb::properties::COMPACTION_PENDING,
1016+
rocksdb::properties::NUM_RUNNING_COMPACTIONS,
1017+
rocksdb::properties::BACKGROUND_ERRORS,
1018+
rocksdb::properties::CUR_SIZE_ACTIVE_MEM_TABLE,
1019+
rocksdb::properties::CUR_SIZE_ALL_MEM_TABLES,
1020+
rocksdb::properties::SIZE_ALL_MEM_TABLES,
1021+
rocksdb::properties::NUM_ENTRIES_ACTIVE_MEM_TABLE,
1022+
rocksdb::properties::NUM_ENTRIES_IMM_MEM_TABLES,
1023+
rocksdb::properties::NUM_DELETES_ACTIVE_MEM_TABLE,
1024+
rocksdb::properties::NUM_DELETES_IMM_MEM_TABLES,
1025+
rocksdb::properties::ESTIMATE_NUM_KEYS,
1026+
rocksdb::properties::NUM_SNAPSHOTS,
1027+
rocksdb::properties::OLDEST_SNAPSHOT_TIME,
1028+
rocksdb::properties::NUM_LIVE_VERSIONS,
1029+
rocksdb::properties::ESTIMATE_LIVE_DATA_SIZE,
1030+
rocksdb::properties::LIVE_SST_FILES_SIZE,
1031+
rocksdb::properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1032+
rocksdb::properties::ESTIMATE_TABLE_READERS_MEM,
1033+
rocksdb::properties::BASE_LEVEL,
1034+
rocksdb::properties::AGGREGATED_TABLE_PROPERTIES,
1035+
];
1036+
1037+
let mut result = Vec::with_capacity(to_collect.len());
1038+
1039+
for property_name in to_collect {
1040+
result.push(RocksPropertyRow {
1041+
key: property_name.to_string_lossy().to_string(),
1042+
value: self.db.property_value(property_name)?,
1043+
})
1044+
}
1045+
1046+
Ok(result)
1047+
}
1048+
9941049
pub async fn healthcheck(&self) -> Result<(), CubeError> {
9951050
self.read_operation(move |_| {
9961051
// read_operation will call getSnapshot, which is enough to test that RocksDB works

rust/cubestore/cubestore/src/queryplanner/info_schema/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod info_schema_columns;
22
mod info_schema_schemata;
33
mod info_schema_tables;
4+
mod rocksdb_properties;
45
mod system_cache;
56
mod system_chunks;
67
mod system_indexes;
@@ -15,6 +16,7 @@ mod system_tables;
1516
pub use info_schema_columns::*;
1617
pub use info_schema_schemata::*;
1718
pub use info_schema_tables::*;
19+
pub use rocksdb_properties::*;
1820
pub use system_cache::*;
1921
pub use system_chunks::*;
2022
pub use system_indexes::*;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use crate::metastore::RocksPropertyRow;
2+
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
3+
use crate::CubeError;
4+
use arrow::array::{ArrayRef, StringArray};
5+
use arrow::datatypes::{DataType, Field};
6+
use async_trait::async_trait;
7+
use std::sync::Arc;
8+
9+
pub struct RocksDBPropertiesTableDef {
10+
for_cachestore: bool,
11+
}
12+
13+
impl RocksDBPropertiesTableDef {
14+
pub fn new_cachestore() -> Self {
15+
Self {
16+
for_cachestore: true,
17+
}
18+
}
19+
20+
pub fn new_metastore() -> Self {
21+
Self {
22+
for_cachestore: false,
23+
}
24+
}
25+
}
26+
27+
#[async_trait]
28+
impl InfoSchemaTableDef for RocksDBPropertiesTableDef {
29+
type T = RocksPropertyRow;
30+
31+
async fn rows(
32+
&self,
33+
ctx: InfoSchemaTableDefContext,
34+
_limit: Option<usize>,
35+
) -> Result<Arc<Vec<Self::T>>, CubeError> {
36+
Ok(Arc::new(if self.for_cachestore {
37+
ctx.cache_store.rocksdb_properties().await?
38+
} else {
39+
ctx.meta_store.rocksdb_properties().await?
40+
}))
41+
}
42+
43+
fn schema(&self) -> Vec<Field> {
44+
vec![
45+
Field::new("id", DataType::Utf8, false),
46+
Field::new("value", DataType::Utf8, true),
47+
]
48+
}
49+
50+
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>> {
51+
vec![
52+
Box::new(|items| {
53+
Arc::new(StringArray::from_iter_values(
54+
items.iter().map(|row| row.key.clone()),
55+
))
56+
}),
57+
Box::new(|items| {
58+
Arc::new(StringArray::from_iter(
59+
items.iter().map(|row| row.value.as_ref()),
60+
))
61+
}),
62+
]
63+
}
64+
}
65+
66+
crate::base_info_schema_table_def!(RocksDBPropertiesTableDef);

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ use crate::metastore::table::{Table, TablePath};
3030
use crate::metastore::{IdRow, MetaStore};
3131
use crate::queryplanner::flatten_union::FlattenUnion;
3232
use crate::queryplanner::info_schema::{
33-
ColumnsInfoSchemaTableDef, SchemataInfoSchemaTableDef, SystemCacheTableDef,
34-
SystemChunksTableDef, SystemIndexesTableDef, SystemJobsTableDef, SystemPartitionsTableDef,
35-
SystemQueueResultsTableDef, SystemQueueTableDef, SystemReplayHandlesTableDef,
36-
SystemSnapshotsTableDef, SystemTablesTableDef, TablesInfoSchemaTableDef,
33+
ColumnsInfoSchemaTableDef, RocksDBPropertiesTableDef, SchemataInfoSchemaTableDef,
34+
SystemCacheTableDef, SystemChunksTableDef, SystemIndexesTableDef, SystemJobsTableDef,
35+
SystemPartitionsTableDef, SystemQueueResultsTableDef, SystemQueueTableDef,
36+
SystemReplayHandlesTableDef, SystemSnapshotsTableDef, SystemTablesTableDef,
37+
TablesInfoSchemaTableDef,
3738
};
3839
use crate::queryplanner::now::MaterializeNow;
3940
use crate::queryplanner::planning::{choose_index_ext, ClusterSendNode};
@@ -385,6 +386,16 @@ impl ContextProvider for MetaStoreSchemaProvider {
385386
self.cache_store.clone(),
386387
InfoSchemaTable::SystemSnapshots,
387388
))),
389+
("metastore", "rocksdb_properties") => Some(Arc::new(InfoSchemaTableProvider::new(
390+
self.meta_store.clone(),
391+
self.cache_store.clone(),
392+
InfoSchemaTable::MetastoreRocksDBProperties,
393+
))),
394+
("cachestore", "rocksdb_properties") => Some(Arc::new(InfoSchemaTableProvider::new(
395+
self.meta_store.clone(),
396+
self.cache_store.clone(),
397+
InfoSchemaTable::CachestoreRocksDBProperties,
398+
))),
388399
_ => None,
389400
})
390401
}
@@ -428,6 +439,8 @@ pub enum InfoSchemaTable {
428439
SystemReplayHandles,
429440
SystemCache,
430441
SystemSnapshots,
442+
CachestoreRocksDBProperties,
443+
MetastoreRocksDBProperties,
431444
}
432445

433446
pub struct InfoSchemaTableDefContext {
@@ -504,6 +517,12 @@ impl InfoSchemaTable {
504517
InfoSchemaTable::SystemJobs => Box::new(SystemJobsTableDef),
505518
InfoSchemaTable::SystemCache => Box::new(SystemCacheTableDef),
506519
InfoSchemaTable::SystemSnapshots => Box::new(SystemSnapshotsTableDef),
520+
InfoSchemaTable::CachestoreRocksDBProperties => {
521+
Box::new(RocksDBPropertiesTableDef::new_cachestore())
522+
}
523+
InfoSchemaTable::MetastoreRocksDBProperties => {
524+
Box::new(RocksDBPropertiesTableDef::new_metastore())
525+
}
507526
}
508527
}
509528

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::metastore::source::{Source, SourceCredentials};
1010
use crate::metastore::table::{StreamOffset, Table, TablePath};
1111
use crate::metastore::{
1212
Chunk, ChunkMetaStoreTable, Column, IdRow, ImportFormat, Index, IndexDef, IndexMetaStoreTable,
13-
MetaStore, Partition, PartitionData, PartitionMetaStoreTable, RowKey, Schema,
13+
MetaStore, Partition, PartitionData, PartitionMetaStoreTable, RocksPropertyRow, RowKey, Schema,
1414
SchemaMetaStoreTable, TableMetaStoreTable, WAL,
1515
};
1616
use crate::table::Row;
@@ -732,6 +732,11 @@ impl MetaStore for MetaStoreMock {
732732
async fn healthcheck(&self) -> Result<(), CubeError> {
733733
panic!("MetaStore mock!")
734734
}
735+
736+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
737+
panic!("MetaStore mock!")
738+
}
739+
735740
async fn get_snapshots_list(&self) -> Result<Vec<SnapshotInfo>, CubeError> {
736741
panic!("MetaStore mock!")
737742
}
@@ -873,6 +878,10 @@ impl CacheStore for CacheStoreMock {
873878
async fn healthcheck(&self) -> Result<(), CubeError> {
874879
panic!("CacheStore mock!")
875880
}
881+
882+
async fn rocksdb_properties(&self) -> Result<Vec<RocksPropertyRow>, CubeError> {
883+
panic!("CacheStore mock!")
884+
}
876885
}
877886

878887
crate::di_service!(CacheStoreMock, [CacheStore]);

0 commit comments

Comments
 (0)