Skip to content

Commit 5e18a90

Browse files
committed
chore(cubestore): Upgrade DF: Kafka-related fixes
1 parent 38180ae commit 5e18a90

File tree

6 files changed

+153
-138
lines changed

6 files changed

+153
-138
lines changed

rust/cubestore/Cargo.lock

Lines changed: 23 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl InfoSchemaTableDef for SystemChunksTableDef {
2828
Field::new("id", DataType::UInt64, false),
2929
Field::new("file_name", DataType::Utf8, false),
3030
Field::new("partition_id", DataType::UInt64, false),
31-
Field::new("replay_handle_id", DataType::UInt64, false),
31+
Field::new("replay_handle_id", DataType::UInt64, true),
3232
Field::new("row_count", DataType::UInt64, true),
3333
Field::new("uploaded", DataType::Boolean, true),
3434
Field::new("active", DataType::Boolean, true),
@@ -46,7 +46,7 @@ impl InfoSchemaTableDef for SystemChunksTableDef {
4646
Field::new(
4747
"deactivated_at",
4848
DataType::Timestamp(TimeUnit::Nanosecond, None),
49-
false,
49+
true,
5050
),
5151
Field::new("file_size", DataType::UInt64, true),
5252
Field::new("min_row", DataType::Utf8, true),

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::queryplanner::topk::ClusterAggregateTopK;
3131
use crate::queryplanner::topk::SortColumn;
3232
use crate::queryplanner::trace_data_loaded::TraceDataLoadedExec;
3333
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
34+
use crate::streaming::topic_table_provider::TopicTableProvider;
3435
use datafusion::physical_plan::empty::EmptyExec;
3536
use datafusion::physical_plan::expressions::Column;
3637
use datafusion::physical_plan::joins::{HashJoinExec, SortMergeJoinExec};
@@ -320,6 +321,8 @@ fn pp_source(t: Arc<dyn TableProvider>) -> String {
320321
.downcast_ref::<InfoSchemaQueryCacheTableProvider>()
321322
{
322323
"InfoSchemaQueryCacheTableProvider".to_string()
324+
} else if let Some(_) = t.as_any().downcast_ref::<TopicTableProvider>() {
325+
"TopicTableProvider".to_string()
323326
} else {
324327
panic!("unknown table provider");
325328
}

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -982,36 +982,37 @@ impl SqlService for SqlServiceImpl {
982982
// .await?;
983983
// Ok(Arc::new(DataFrame::from(vec![res])))
984984
// }
985-
// CubeStoreStatement::Statement(Statement::Drop {
986-
// object_type, names, ..
987-
// }) => {
988-
// let command = match object_type {
989-
// ObjectType::Schema => {
990-
// self.db.delete_schema(names[0].to_string()).await?;
991-
// &"drop_schema"
992-
// }
993-
// ObjectType::Table => {
994-
// let table = self
995-
// .db
996-
// .get_table(names[0].0[0].to_string(), names[0].0[1].to_string())
997-
// .await?;
998-
// self.db.drop_table(table.get_id()).await?;
999-
// &"drop_table"
1000-
// }
1001-
// ObjectType::PartitionedIndex => {
1002-
// let schema = names[0].0[0].value.clone();
1003-
// let name = names[0].0[1].value.clone();
1004-
// self.db.drop_partitioned_index(schema, name).await?;
1005-
// &"drop_partitioned_index"
1006-
// }
1007-
// _ => return Err(CubeError::user("Unsupported drop operation".to_string())),
1008-
// };
1009-
//
1010-
// app_metrics::DATA_QUERIES
1011-
// .add_with_tags(1, Some(&vec![metrics::format_tag("command", command)]));
1012-
//
1013-
// Ok(Arc::new(DataFrame::new(vec![], vec![])))
1014-
// }
985+
CubeStoreStatement::Statement(Statement::Drop {
986+
object_type, names, ..
987+
}) => {
988+
let command = match object_type {
989+
ObjectType::Schema => {
990+
self.db.delete_schema(names[0].to_string()).await?;
991+
&"drop_schema"
992+
}
993+
ObjectType::Table => {
994+
let table = self
995+
.db
996+
.get_table(names[0].0[0].to_string(), names[0].0[1].to_string())
997+
.await?;
998+
self.db.drop_table(table.get_id()).await?;
999+
&"drop_table"
1000+
}
1001+
// TODO upgrade DF
1002+
// ObjectType::PartitionedIndex => {
1003+
// let schema = names[0].0[0].value.clone();
1004+
// let name = names[0].0[1].value.clone();
1005+
// self.db.drop_partitioned_index(schema, name).await?;
1006+
// &"drop_partitioned_index"
1007+
// }
1008+
_ => return Err(CubeError::user("Unsupported drop operation".to_string())),
1009+
};
1010+
1011+
app_metrics::DATA_QUERIES
1012+
.add_with_tags(1, Some(&vec![metrics::format_tag("command", command)]));
1013+
1014+
Ok(Arc::new(DataFrame::new(vec![], vec![])))
1015+
}
10151016
CubeStoreStatement::Statement(Statement::Insert(Insert {
10161017
table_name,
10171018
columns,
@@ -4160,7 +4161,7 @@ mod tests {
41604161
.unwrap();
41614162

41624163
let _ = service
4163-
.exec_query("CREATE TABLE test.events_by_type_1 (`EVENT` text, `KSQL_COL_0` int) WITH (select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE time >= \\'2022-01-01\\' AND time < \\'2022-02-01\\'') unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'")
4164+
.exec_query("CREATE TABLE test.events_by_type_1 (`EVENT` text, `KSQL_COL_0` int) WITH (select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE time >= ''2022-01-01'' AND time < ''2022-02-01''') unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'")
41644165
.await
41654166
.unwrap();
41664167

@@ -4204,7 +4205,7 @@ mod tests {
42044205

42054206
let _ = service
42064207
.exec_query("CREATE TABLE test.events_1 (a int, b int) WITH (\
4207-
select_statement = 'SELECT a as a, b + c as b FROM EVENTS_BY_TYPE WHERE c > 10',\
4208+
select_statement = 'SELECT a as a, b + c as b FROM `EVENTS_BY_TYPE` WHERE c > 10',\
42084209
source_table = 'CREATE TABLE events1 (a int, b int, c int)'
42094210
) unique key (`a`) location 'stream://kafka/EVENTS_BY_TYPE/0'")
42104211
.await

0 commit comments

Comments
 (0)