Skip to content

Commit 70e999a

Browse files
committed
chore(cubestore): Upgrade DF: Partitioned index support
1 parent 1877b8a commit 70e999a

File tree

4 files changed

+76
-78
lines changed

4 files changed

+76
-78
lines changed

rust/cubestore/Cargo.lock

Lines changed: 25 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3824,19 +3824,22 @@ async fn planning_join_with_partitioned_index(service: Box<dyn SqlClient>) {
38243824
.unwrap();
38253825
assert_eq!(
38263826
pp_phys_plan(p.router.as_ref()),
3827-
"ClusterSend, partitions: [[1, 3]]"
3827+
"CoalescePartitions\
3828+
\n ClusterSend, partitions: [[1, 3]]"
38283829
);
38293830
assert_eq!(
38303831
pp_phys_plan(p.worker.as_ref()),
3831-
"Worker\
3832-
\n Projection, [order_id, customer_name]\
3833-
\n MergeJoin, on: [customer_id@1 = customer_id@0]\
3834-
\n MergeSort\
3835-
\n Scan, index: #mi0:1:[1]:sort_on[customer_id], fields: [order_id, customer_id]\
3836-
\n Empty\
3837-
\n MergeSort\
3838-
\n Scan, index: #mi0:3:[3]:sort_on[customer_id], fields: *\
3839-
\n Empty",
3832+
"CoalescePartitions\
3833+
\n Worker\
3834+
\n CoalescePartitions\
3835+
\n Projection, [order_id, customer_name]\
3836+
\n MergeJoin, on: [customer_id@1 = customer_id@0]\
3837+
\n Scan, index: #mi0:1:[1]:sort_on[customer_id], fields: [order_id, customer_id]\
3838+
\n Sort\
3839+
\n Empty\
3840+
\n Scan, index: #mi0:3:[3]:sort_on[customer_id], fields: *\
3841+
\n Sort\
3842+
\n Empty"
38403843
);
38413844
}
38423845

rust/cubestore/cubestore/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ base64 = "0.13.0"
1818
bumpalo = "3.6.1"
1919
tokio = { version = "1", features = ["full", "rt"] }
2020
warp = { version = "0.3.6" }
21-
#sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "4388f6712dae5073c2d71d74f64cae2edd418066" }
22-
sqlparser = { version = "0.50.0" }
21+
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", branch = "cube-42.2.0" }
2322
serde_derive = "1.0.115"
2423
serde = "1.0.115"
2524
serde_repr = "0.1"

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -992,38 +992,37 @@ impl SqlService for SqlServiceImpl {
992992
))
993993
}
994994
}
995-
// TODO upgrade DF
996-
// CubeStoreStatement::Statement(Statement::CreatePartitionedIndex {
997-
// name,
998-
// columns,
999-
// if_not_exists,
1000-
// }) => {
1001-
// app_metrics::DATA_QUERIES.add_with_tags(
1002-
// 1,
1003-
// Some(&vec![metrics::format_tag(
1004-
// "command",
1005-
// "create_partitioned_index",
1006-
// )]),
1007-
// );
1008-
//
1009-
// if name.0.len() != 2 {
1010-
// return Err(CubeError::user(format!(
1011-
// "Expected name for PARTITIONED INDEX in the form '<SCHEMA>.<INDEX>', found: {}",
1012-
// name
1013-
// )));
1014-
// }
1015-
// let schema = &name.0[0].value;
1016-
// let index = &name.0[1].value;
1017-
// let res = self
1018-
// .create_partitioned_index(
1019-
// schema.to_string(),
1020-
// index.to_string(),
1021-
// columns,
1022-
// if_not_exists,
1023-
// )
1024-
// .await?;
1025-
// Ok(Arc::new(DataFrame::from(vec![res])))
1026-
// }
995+
CubeStoreStatement::Statement(Statement::CreatePartitionedIndex {
996+
name,
997+
columns,
998+
if_not_exists,
999+
}) => {
1000+
app_metrics::DATA_QUERIES.add_with_tags(
1001+
1,
1002+
Some(&vec![metrics::format_tag(
1003+
"command",
1004+
"create_partitioned_index",
1005+
)]),
1006+
);
1007+
1008+
if name.0.len() != 2 {
1009+
return Err(CubeError::user(format!(
1010+
"Expected name for PARTITIONED INDEX in the form '<SCHEMA>.<INDEX>', found: {}",
1011+
name
1012+
)));
1013+
}
1014+
let schema = &name.0[0].value;
1015+
let index = &name.0[1].value;
1016+
let res = self
1017+
.create_partitioned_index(
1018+
schema.to_string(),
1019+
index.to_string(),
1020+
columns,
1021+
if_not_exists,
1022+
)
1023+
.await?;
1024+
Ok(Arc::new(DataFrame::from(vec![res])))
1025+
}
10271026
CubeStoreStatement::Statement(Statement::Drop {
10281027
object_type, names, ..
10291028
}) => {
@@ -1040,13 +1039,12 @@ impl SqlService for SqlServiceImpl {
10401039
self.db.drop_table(table.get_id()).await?;
10411040
&"drop_table"
10421041
}
1043-
// TODO upgrade DF
1044-
// ObjectType::PartitionedIndex => {
1045-
// let schema = names[0].0[0].value.clone();
1046-
// let name = names[0].0[1].value.clone();
1047-
// self.db.drop_partitioned_index(schema, name).await?;
1048-
// &"drop_partitioned_index"
1049-
// }
1042+
ObjectType::PartitionedIndex => {
1043+
let schema = names[0].0[0].value.clone();
1044+
let name = names[0].0[1].value.clone();
1045+
self.db.drop_partitioned_index(schema, name).await?;
1046+
&"drop_partitioned_index"
1047+
}
10501048
_ => return Err(CubeError::user("Unsupported drop operation".to_string())),
10511049
};
10521050

0 commit comments

Comments
 (0)