Skip to content

Commit e224d1d

Browse files
committed
chore(cubestore): Upgrade DF: fix info schema table providers
1 parent afebee3 commit e224d1d

File tree

7 files changed

+28
-26
lines changed

7 files changed

+28
-26
lines changed

rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ impl InfoSchemaTableDef for TablesInfoSchemaTableDef {
2727
Field::new(
2828
"build_range_end",
2929
DataType::Timestamp(TimeUnit::Nanosecond, None),
30-
false,
30+
true,
3131
),
3232
Field::new(
3333
"seal_at",
3434
DataType::Timestamp(TimeUnit::Nanosecond, None),
35-
false,
35+
true,
3636
),
3737
]
3838
}

rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ impl InfoSchemaTableDef for SystemTablesTableDef {
4545
Field::new(
4646
"build_range_end",
4747
DataType::Timestamp(TimeUnit::Nanosecond, None),
48-
false,
48+
true,
4949
),
5050
Field::new(
5151
"seal_at",
5252
DataType::Timestamp(TimeUnit::Nanosecond, None),
53-
false,
53+
true,
5454
),
5555
Field::new("sealed", DataType::Boolean, false),
56-
Field::new("select_statement", DataType::Utf8, false),
56+
Field::new("select_statement", DataType::Utf8, true),
5757
Field::new("extension", DataType::Utf8, true),
5858
]
5959
}

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ impl ContextProvider for MetaStoreSchemaProvider {
318318
let table = self
319319
.inline_tables
320320
.iter()
321-
.find(|inline_table| inline_table.name == table.as_ref())
321+
.find(|inline_table| inline_table.name.to_lowercase() == table.as_ref())
322322
.ok_or_else(|| {
323323
DataFusionError::Plan(format!("Inline table {} was not found", name))
324324
})?;
@@ -789,11 +789,16 @@ impl ExecutionPlan for InfoSchemaTableExec {
789789
};
790790
let table = self.table.clone();
791791
let limit = self.limit.clone();
792+
let projection = self.projection.clone();
792793
let batch = async move {
793-
table
794+
let mut batch = table
794795
.scan(table_def, limit)
795796
.await
796-
.map_err(|e| DataFusionError::Execution(e.to_string()))
797+
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
798+
if let Some(projection) = projection {
799+
batch = batch.project(projection.as_slice())?;
800+
}
801+
Ok(batch)
797802
};
798803

799804
let stream = futures::stream::once(batch);

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::queryplanner::tail_limit::TailLimitExec;
2828
use crate::queryplanner::topk::ClusterAggregateTopK;
2929
use crate::queryplanner::topk::SortColumn;
3030
use crate::queryplanner::trace_data_loaded::TraceDataLoadedExec;
31-
use crate::queryplanner::CubeTableLogical;
31+
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
3232
use datafusion::physical_plan::empty::EmptyExec;
3333
use datafusion::physical_plan::expressions::Column;
3434
use datafusion::physical_plan::joins::HashJoinExec;
@@ -303,6 +303,8 @@ fn pp_source(t: Arc<dyn TableProvider>) -> String {
303303
format!("CubeTable(index: {})", pp_index(t.index_snapshot()))
304304
} else if let Some(t) = t.as_any().downcast_ref::<InlineTableProvider>() {
305305
format!("InlineTableProvider(data: {} rows)", t.get_data().len())
306+
} else if let Some(t) = t.as_any().downcast_ref::<InfoSchemaTableProvider>() {
307+
format!("InfoSchemaTableProvider(table: {:?})", t.table)
306308
} else {
307309
panic!("unknown table provider");
308310
}

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,7 +1587,7 @@ impl TableProvider for InlineTableProvider {
15871587
.collect::<Vec<Field>>(),
15881588
))
15891589
} else {
1590-
schema
1590+
schema.clone()
15911591
};
15921592

15931593
if !self.inline_table_ids.iter().any(|id| id == &self.id) {
@@ -1599,7 +1599,7 @@ impl TableProvider for InlineTableProvider {
15991599
let projection = projection.cloned();
16001600
Ok(Arc::new(MemoryExec::try_new(
16011601
&vec![batches],
1602-
projected_schema,
1602+
schema.clone(),
16031603
projection,
16041604
)?))
16051605
}

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub type InlineTables = Vec<InlineTable>;
128128

129129
impl InlineTable {
130130
pub fn new(id: u64, name: String, data: Arc<DataFrame>) -> Self {
131-
Self { id, name, data }
131+
Self { id, name: name.to_lowercase(), data: Arc::new(data.lowercase()) }
132132
}
133133
}
134134

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,19 @@ pub const ROW_GROUP_SIZE: usize = 16384; // TODO config
5959
#[derive(Serialize, Deserialize, Hash, Eq, PartialEq, Debug, DeepSizeOf)]
6060
pub struct DataFrame {
6161
columns: Vec<Column>,
62-
data: Vec<Row>,
62+
data: Arc<Vec<Row>>,
6363
}
6464

6565
impl DataFrame {
6666
pub fn new(columns: Vec<Column>, data: Vec<Row>) -> DataFrame {
67-
DataFrame { columns, data }
67+
DataFrame { columns, data: Arc::new(data) }
68+
}
69+
70+
pub fn lowercase(&self) -> Self {
71+
Self {
72+
columns: self.columns.iter().map(|c| Column::new(c.get_name().to_lowercase(), c.get_column_type().clone(), c.get_index().clone())).collect(),
73+
data: self.data.clone(),
74+
}
6875
}
6976

7077
pub fn len(&self) -> usize {
@@ -88,14 +95,6 @@ impl DataFrame {
8895
&self.data
8996
}
9097

91-
pub fn mut_rows(&mut self) -> &mut Vec<Row> {
92-
&mut self.data
93-
}
94-
95-
pub fn into_rows(self) -> Vec<Row> {
96-
self.data
97-
}
98-
9998
pub fn to_execution_plan(
10099
&self,
101100
columns: &Vec<Column>,
@@ -166,10 +165,6 @@ impl ChunkData {
166165
pub fn len(&self) -> usize {
167166
self.data_frame.len()
168167
}
169-
170-
pub fn mut_rows(&mut self) -> &mut Vec<Row> {
171-
&mut self.data_frame.data
172-
}
173168
}
174169

175170
pub struct WALStore {

0 commit comments

Comments
 (0)