Skip to content

Commit ace2054

Browse files
authored
feat(cubestore): Limit push down for system tables (cache, queue, queue_results) (#6977)
1 parent 84211b2 commit ace2054

18 files changed

+1253
-1289
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ impl QueueResultResponse {
433433
#[cuberpc::service]
434434
pub trait CacheStore: DIService + Send + Sync {
435435
// cache
436-
async fn cache_all(&self) -> Result<Vec<IdRow<CacheItem>>, CubeError>;
436+
async fn cache_all(&self, limit: Option<usize>) -> Result<Vec<IdRow<CacheItem>>, CubeError>;
437437
async fn cache_set(
438438
&self,
439439
item: CacheItem,
@@ -446,8 +446,11 @@ pub trait CacheStore: DIService + Send + Sync {
446446
async fn cache_incr(&self, key: String) -> Result<IdRow<CacheItem>, CubeError>;
447447

448448
// queue
449-
async fn queue_all(&self) -> Result<Vec<IdRow<QueueItem>>, CubeError>;
450-
async fn queue_results_all(&self) -> Result<Vec<IdRow<QueueResult>>, CubeError>;
449+
async fn queue_all(&self, limit: Option<usize>) -> Result<Vec<IdRow<QueueItem>>, CubeError>;
450+
async fn queue_results_all(
451+
&self,
452+
limit: Option<usize>,
453+
) -> Result<Vec<IdRow<QueueResult>>, CubeError>;
451454
async fn queue_results_multi_delete(&self, ids: Vec<u64>) -> Result<(), CubeError>;
452455
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError>;
453456
async fn queue_truncate(&self) -> Result<(), CubeError>;
@@ -491,10 +494,10 @@ pub trait CacheStore: DIService + Send + Sync {
491494

492495
#[async_trait]
493496
impl CacheStore for RocksCacheStore {
494-
async fn cache_all(&self) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
497+
async fn cache_all(&self, limit: Option<usize>) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
495498
self.store
496499
.read_operation_out_of_queue(move |db_ref| {
497-
Ok(CacheItemRocksTable::new(db_ref).all_rows()?)
500+
Ok(CacheItemRocksTable::new(db_ref).scan_rows(limit)?)
498501
})
499502
.await
500503
}
@@ -616,15 +619,18 @@ impl CacheStore for RocksCacheStore {
616619
.await
617620
}
618621

619-
async fn queue_all(&self) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
622+
async fn queue_all(&self, limit: Option<usize>) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
620623
self.store
621-
.read_operation(move |db_ref| Ok(QueueItemRocksTable::new(db_ref).all_rows()?))
624+
.read_operation(move |db_ref| Ok(QueueItemRocksTable::new(db_ref).scan_rows(limit)?))
622625
.await
623626
}
624627

625-
async fn queue_results_all(&self) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
628+
async fn queue_results_all(
629+
&self,
630+
limit: Option<usize>,
631+
) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
626632
self.store
627-
.read_operation(move |db_ref| Ok(QueueResultRocksTable::new(db_ref).all_rows()?))
633+
.read_operation(move |db_ref| Ok(QueueResultRocksTable::new(db_ref).scan_rows(limit)?))
628634
.await
629635
}
630636

@@ -1009,7 +1015,7 @@ pub struct ClusterCacheStoreClient {}
10091015

10101016
#[async_trait]
10111017
impl CacheStore for ClusterCacheStoreClient {
1012-
async fn cache_all(&self) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
1018+
async fn cache_all(&self, _limit: Option<usize>) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
10131019
panic!("CacheStore cannot be used on the worker node! cache_all was used.")
10141020
}
10151021

@@ -1041,11 +1047,14 @@ impl CacheStore for ClusterCacheStoreClient {
10411047
panic!("CacheStore cannot be used on the worker node! cache_incr was used.")
10421048
}
10431049

1044-
async fn queue_all(&self) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
1050+
async fn queue_all(&self, _limit: Option<usize>) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
10451051
panic!("CacheStore cannot be used on the worker node! queue_all was used.")
10461052
}
10471053

1048-
async fn queue_results_all(&self) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
1054+
async fn queue_results_all(
1055+
&self,
1056+
_limit: Option<usize>,
1057+
) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
10491058
panic!("CacheStore cannot be used on the worker node! queue_results_all was used.")
10501059
}
10511060

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ impl LazyRocksCacheStore {
167167

168168
#[async_trait]
169169
impl CacheStore for LazyRocksCacheStore {
170-
async fn cache_all(&self) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
171-
self.init().await?.cache_all().await
170+
async fn cache_all(&self, limit: Option<usize>) -> Result<Vec<IdRow<CacheItem>>, CubeError> {
171+
self.init().await?.cache_all(limit).await
172172
}
173173

174174
async fn cache_set(
@@ -202,12 +202,15 @@ impl CacheStore for LazyRocksCacheStore {
202202
self.init().await?.cache_incr(path).await
203203
}
204204

205-
async fn queue_all(&self) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
206-
self.init().await?.queue_all().await
205+
async fn queue_all(&self, limit: Option<usize>) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
206+
self.init().await?.queue_all(limit).await
207207
}
208208

209-
async fn queue_results_all(&self) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
210-
self.init().await?.queue_results_all().await
209+
async fn queue_results_all(
210+
&self,
211+
limit: Option<usize>,
212+
) -> Result<Vec<IdRow<QueueResult>>, CubeError> {
213+
self.init().await?.queue_results_all(limit).await
211214
}
212215

213216
async fn queue_results_multi_delete(&self, ids: Vec<u64>) -> Result<(), CubeError> {

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,24 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
11081108
Ok(())
11091109
}
11101110

1111+
fn scan_rows(&self, limit: Option<usize>) -> Result<Vec<IdRow<Self::T>>, CubeError> {
1112+
let iter = self.table_scan(self.snapshot())?;
1113+
1114+
let mut res = Vec::new();
1115+
1116+
if let Some(limit) = limit {
1117+
for row in iter.take(limit) {
1118+
res.push(row?);
1119+
}
1120+
} else {
1121+
for row in iter {
1122+
res.push(row?);
1123+
}
1124+
};
1125+
1126+
Ok(res)
1127+
}
1128+
11111129
fn all_rows(&self) -> Result<Vec<IdRow<Self::T>>, CubeError> {
11121130
let mut res = Vec::new();
11131131
for row in self.table_scan(self.snapshot())? {

rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs

Lines changed: 43 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ impl InfoSchemaTableDef for ColumnsInfoSchemaTableDef {
1616
async fn rows(
1717
&self,
1818
ctx: InfoSchemaTableDefContext,
19+
_limit: Option<usize>,
1920
) -> Result<Arc<Vec<(Column, TablePath)>>, CubeError> {
2021
let rows = ctx.meta_store.get_tables_with_path(false).await?;
2122
let mut res = Vec::new();
@@ -30,57 +31,49 @@ impl InfoSchemaTableDef for ColumnsInfoSchemaTableDef {
3031
Ok(Arc::new(res))
3132
}
3233

33-
fn columns(
34-
&self,
35-
) -> Vec<(
36-
Field,
37-
Box<dyn Fn(Arc<Vec<(Column, TablePath)>>) -> ArrayRef>,
38-
)> {
34+
fn schema(&self) -> Vec<Field> {
35+
vec![
36+
Field::new("table_schema", DataType::Utf8, false),
37+
Field::new("table_name", DataType::Utf8, false),
38+
Field::new("column_name", DataType::Utf8, false),
39+
Field::new("data_type", DataType::Utf8, false),
40+
]
41+
}
42+
43+
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<(Column, TablePath)>>) -> ArrayRef>> {
3944
vec![
40-
(
41-
Field::new("table_schema", DataType::Utf8, false),
42-
Box::new(|tables| {
43-
Arc::new(StringArray::from(
44-
tables
45-
.iter()
46-
.map(|(_, row)| row.schema.get_row().get_name().as_str())
47-
.collect::<Vec<_>>(),
48-
))
49-
}),
50-
),
51-
(
52-
Field::new("table_name", DataType::Utf8, false),
53-
Box::new(|tables| {
54-
Arc::new(StringArray::from(
55-
tables
56-
.iter()
57-
.map(|(_, row)| row.table.get_row().get_table_name().as_str())
58-
.collect::<Vec<_>>(),
59-
))
60-
}),
61-
),
62-
(
63-
Field::new("column_name", DataType::Utf8, false),
64-
Box::new(|tables| {
65-
Arc::new(StringArray::from(
66-
tables
67-
.iter()
68-
.map(|(column, _)| column.get_name().as_str())
69-
.collect::<Vec<_>>(),
70-
))
71-
}),
72-
),
73-
(
74-
Field::new("data_type", DataType::Utf8, false),
75-
Box::new(|tables| {
76-
Arc::new(StringArray::from(
77-
tables
78-
.iter()
79-
.map(|(column, _)| column.get_column_type().to_string())
80-
.collect::<Vec<_>>(),
81-
))
82-
}),
83-
),
45+
Box::new(|tables| {
46+
Arc::new(StringArray::from(
47+
tables
48+
.iter()
49+
.map(|(_, row)| row.schema.get_row().get_name().as_str())
50+
.collect::<Vec<_>>(),
51+
))
52+
}),
53+
Box::new(|tables| {
54+
Arc::new(StringArray::from(
55+
tables
56+
.iter()
57+
.map(|(_, row)| row.table.get_row().get_table_name().as_str())
58+
.collect::<Vec<_>>(),
59+
))
60+
}),
61+
Box::new(|tables| {
62+
Arc::new(StringArray::from(
63+
tables
64+
.iter()
65+
.map(|(column, _)| column.get_name().as_str())
66+
.collect::<Vec<_>>(),
67+
))
68+
}),
69+
Box::new(|tables| {
70+
Arc::new(StringArray::from(
71+
tables
72+
.iter()
73+
.map(|(column, _)| column.get_column_type().to_string())
74+
.collect::<Vec<_>>(),
75+
))
76+
}),
8477
]
8578
}
8679
}

rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,27 @@ pub struct SchemataInfoSchemaTableDef;
1212
impl InfoSchemaTableDef for SchemataInfoSchemaTableDef {
1313
type T = IdRow<Schema>;
1414

15-
async fn rows(&self, ctx: InfoSchemaTableDefContext) -> Result<Arc<Vec<Self::T>>, CubeError> {
15+
async fn rows(
16+
&self,
17+
ctx: InfoSchemaTableDefContext,
18+
_limit: Option<usize>,
19+
) -> Result<Arc<Vec<Self::T>>, CubeError> {
1620
Ok(Arc::new(ctx.meta_store.schemas_table().all_rows().await?))
1721
}
1822

19-
fn columns(&self) -> Vec<(Field, Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>)> {
20-
vec![(
21-
Field::new("schema_name", DataType::Utf8, false),
22-
Box::new(|tables| {
23-
Arc::new(StringArray::from(
24-
tables
25-
.iter()
26-
.map(|row| row.get_row().get_name().as_str())
27-
.collect::<Vec<_>>(),
28-
))
29-
}),
30-
)]
23+
fn schema(&self) -> Vec<Field> {
24+
vec![Field::new("schema_name", DataType::Utf8, false)]
25+
}
26+
27+
fn columns(&self) -> Vec<Box<dyn Fn(Arc<Vec<Self::T>>) -> ArrayRef>> {
28+
vec![Box::new(|tables| {
29+
Arc::new(StringArray::from(
30+
tables
31+
.iter()
32+
.map(|row| row.get_row().get_name().as_str())
33+
.collect::<Vec<_>>(),
34+
))
35+
})]
3136
}
3237
}
3338

0 commit comments

Comments
 (0)