Skip to content

Commit e0495a9

Browse files
authored
feat(cubestore): Projection in kafka streaming queries (#6479)
1 parent 1989fa5 commit e0495a9

File tree

15 files changed

+1748
-353
lines changed

15 files changed

+1748
-353
lines changed

rust/cubestore/Cargo.lock

Lines changed: 80 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ enum_primitive = "0.1.1"
4949
msql-srv = { git = 'https://github.com/cube-js/msql-srv', version = '0.9.2' }
5050
bincode = "1.3.1"
5151
chrono = "0.4.15"
52+
chrono-tz = "0.8.2"
5253
lazy_static = "1.4.0"
5354
mockall = "0.8.1"
5455
async-std = "0.99"

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,14 @@ impl DataFrameValue<String> for Vec<Column> {
208208
}
209209
}
210210

211+
impl DataFrameValue<String> for Option<Vec<Column>> {
212+
fn value(v: &Self) -> String {
213+
v.as_ref()
214+
.map(|v| serde_json::to_string(v).unwrap())
215+
.unwrap_or("NULL".to_string())
216+
}
217+
}
218+
211219
impl DataFrameValue<String> for Option<String> {
212220
fn value(v: &Self) -> String {
213221
v.as_ref()
@@ -769,6 +777,7 @@ pub trait MetaStore: DIService + Send + Sync {
769777
build_range_end: Option<DateTime<Utc>>,
770778
seal_at: Option<DateTime<Utc>>,
771779
select_statement: Option<String>,
780+
source_coulumns: Option<Vec<Column>>,
772781
stream_offset: Option<StreamOffset>,
773782
unique_key_column_names: Option<Vec<String>>,
774783
aggregates: Option<Vec<(String, String)>>,
@@ -1895,6 +1904,7 @@ impl MetaStore for RocksMetaStore {
18951904
build_range_end: Option<DateTime<Utc>>,
18961905
seal_at: Option<DateTime<Utc>>,
18971906
select_statement: Option<String>,
1907+
source_coulumns: Option<Vec<Column>>,
18981908
stream_offset: Option<StreamOffset>,
18991909
unique_key_column_names: Option<Vec<String>>,
19001910
aggregates: Option<Vec<(String, String)>>,
@@ -1991,6 +2001,7 @@ impl MetaStore for RocksMetaStore {
19912001
build_range_end,
19922002
seal_at,
19932003
select_statement,
2004+
source_coulumns,
19942005
stream_offset,
19952006
unique_key_column_indices,
19962007
aggregate_column_indices,
@@ -4823,6 +4834,7 @@ mod tests {
48234834
None,
48244835
None,
48254836
None,
4837+
None,
48264838
)
48274839
.await
48284840
.unwrap();
@@ -4844,6 +4856,7 @@ mod tests {
48444856
None,
48454857
None,
48464858
None,
4859+
None,
48474860
)
48484861
.await
48494862
.unwrap();
@@ -4964,6 +4977,7 @@ mod tests {
49644977
None,
49654978
None,
49664979
None,
4980+
None,
49674981
)
49684982
.await
49694983
.unwrap();
@@ -4987,6 +5001,7 @@ mod tests {
49875001
None,
49885002
None,
49895003
None,
5004+
None,
49905005
)
49915006
.await
49925007
.is_err());
@@ -5074,6 +5089,7 @@ mod tests {
50745089
None,
50755090
None,
50765091
None,
5092+
None,
50775093
)
50785094
.await
50795095
.unwrap();
@@ -5156,6 +5172,7 @@ mod tests {
51565172
None,
51575173
None,
51585174
None,
5175+
None,
51595176
Some(vec![
51605177
("sum".to_string(), "aggr_col2".to_string()),
51615178
("max".to_string(), "aggr_col1".to_string()),
@@ -5226,6 +5243,7 @@ mod tests {
52265243
None,
52275244
None,
52285245
None,
5246+
None,
52295247
Some(vec!["col2".to_string(), "col1".to_string()]),
52305248
Some(vec![
52315249
("sum".to_string(), "aggr_col2".to_string()),
@@ -5250,6 +5268,7 @@ mod tests {
52505268
None,
52515269
None,
52525270
None,
5271+
None,
52535272
Some(vec!["col1".to_string()]),
52545273
None,
52555274
None,
@@ -5271,6 +5290,7 @@ mod tests {
52715290
None,
52725291
None,
52735292
None,
5293+
None,
52745294
Some(vec!["col1".to_string()]),
52755295
Some(vec![
52765296
("sum".to_string(), "aggr_col2".to_string()),
@@ -5753,6 +5773,7 @@ mod tests {
57535773
None,
57545774
None,
57555775
None,
5776+
None,
57565777
)
57575778
.await
57585779
.unwrap();
@@ -5974,6 +5995,7 @@ mod tests {
59745995
None,
59755996
None,
59765997
None,
5998+
None,
59775999
)
59786000
.await
59796001
.unwrap();
@@ -6113,6 +6135,7 @@ mod tests {
61136135
None,
61146136
None,
61156137
None,
6138+
None,
61166139
)
61176140
.await
61186141
.unwrap();

rust/cubestore/cubestore/src/metastore/table.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ pub struct Table {
141141
#[serde(default)]
142142
select_statement: Option<String>,
143143
#[serde(default)]
144+
source_columns: Option<Vec<Column>>,
145+
#[serde(default)]
144146
stream_offset: Option<StreamOffset>,
145147
#[serde(default)]
146148
unique_key_column_indices: Option<Vec<u64>>,
@@ -182,6 +184,7 @@ impl Table {
182184
build_range_end: Option<DateTime<Utc>>,
183185
seal_at: Option<DateTime<Utc>>,
184186
select_statement: Option<String>,
187+
source_columns: Option<Vec<Column>>,
185188
stream_offset: Option<StreamOffset>,
186189
unique_key_column_indices: Option<Vec<u64>>,
187190
aggregate_column_indices: Vec<AggregateColumnIndex>,
@@ -201,6 +204,7 @@ impl Table {
201204
build_range_end,
202205
seal_at,
203206
select_statement,
207+
source_columns,
204208
stream_offset,
205209
sealed: false,
206210
unique_key_column_indices,
@@ -308,6 +312,10 @@ impl Table {
308312
&self.select_statement
309313
}
310314

315+
pub fn source_columns(&self) -> &Option<Vec<Column>> {
316+
&self.source_columns
317+
}
318+
311319
pub fn sealed(&self) -> bool {
312320
self.sealed
313321
}

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ impl ContextProvider for MetaStoreSchemaProvider {
281281
None,
282282
None,
283283
None,
284+
None,
284285
Vec::new(),
285286
None,
286287
None,

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,6 +2128,7 @@ pub mod tests {
21282128
None,
21292129
None,
21302130
None,
2131+
None,
21312132
Vec::new(),
21322133
None,
21332134
None,
@@ -2178,6 +2179,7 @@ pub mod tests {
21782179
None,
21792180
None,
21802181
None,
2182+
None,
21812183
Vec::new(),
21822184
None,
21832185
None,
@@ -2234,6 +2236,7 @@ pub mod tests {
22342236
None,
22352237
None,
22362238
None,
2239+
None,
22372240
Vec::new(),
22382241
None,
22392242
None,

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl MetaStore for MetaStoreMock {
9696
_build_range_end: Option<DateTime<Utc>>,
9797
_seal_at: Option<DateTime<Utc>>,
9898
_select_statement: Option<String>,
99+
_source_columns: Option<Vec<Column>>,
99100
_stream_offset: Option<StreamOffset>,
100101
_unique_key_column_names: Option<Vec<String>>,
101102
_aggregates: Option<Vec<(String, String)>>,

0 commit comments

Comments
 (0)