Skip to content

Commit 4e94f1a

Browse files
committed
chore(cubestore): Upgrade DF: Use DF 46.0.1
1 parent 8924698 commit 4e94f1a

32 files changed

+1280
-584
lines changed

rust/cubestore/Cargo.lock

Lines changed: 727 additions & 274 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ base64 = "0.13.0"
1818
bumpalo = "3.6.1"
1919
tokio = { version = "1", features = ["full", "rt"] }
2020
warp = { version = "0.3.6" }
21-
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", branch = "cube-42.2.0" }
21+
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", branch = "cube-46.0.1" }
2222
serde_derive = "1.0.115"
2323
serde = "1.0.115"
2424
serde_repr = "0.1"
@@ -29,9 +29,10 @@ cubezetasketch = { path = "../cubezetasketch" }
2929
cubedatasketches = { path = "../cubedatasketches" }
3030
cubeshared = { path = "../../cubeshared" }
3131
cuberpc = { path = "../cuberpc" }
32-
datafusion = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-42.2.0", features = ["serde"] }
33-
datafusion-proto = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-42.2.0" }
34-
datafusion-proto-common = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-42.2.0" }
32+
datafusion = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1", features = ["serde"] }
33+
datafusion-datasource = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1" }
34+
datafusion-proto = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1" }
35+
datafusion-proto-common = { git = "https://github.com/cube-js/arrow-datafusion", branch = "cube-46.0.1" }
3536
csv = "1.1.3"
3637
bytes = "1.6.0"
3738
serde_json = "1.0.56"

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl DataFrameValue<String> for Option<Vec<AggregateFunction>> {
341341
}
342342
}
343343

344-
#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, DeepSizeOf)]
344+
#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd, DeepSizeOf)]
345345
pub enum HllFlavour {
346346
Airlift, // Compatible with Presto, Athena, etc.
347347
Snowflake, // Same storage as Airlift, imports from Snowflake JSON.
@@ -369,7 +369,7 @@ pub fn is_valid_plain_binary_hll(data: &[u8], f: HllFlavour) -> Result<(), CubeE
369369
return Ok(());
370370
}
371371

372-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, DeepSizeOf)]
372+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd, DeepSizeOf)]
373373
pub enum ColumnType {
374374
String,
375375
Int,
@@ -547,7 +547,7 @@ impl From<&Column> for types::Type {
547547
}
548548
}
549549

550-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, DeepSizeOf)]
550+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd, DeepSizeOf)]
551551
pub struct Column {
552552
name: String,
553553
column_type: ColumnType,
@@ -611,7 +611,7 @@ impl fmt::Display for Column {
611611
}
612612
}
613613

614-
#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
614+
#[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
615615
pub enum ImportFormat {
616616
CSV,
617617
CSVNoHeader,
@@ -624,22 +624,22 @@ pub enum ImportFormat {
624624
}
625625

626626
data_frame_from! {
627-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
627+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
628628
pub struct Schema {
629629
name: String
630630
}
631631
}
632632

633633
impl RocksEntity for Schema {}
634634

635-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
635+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
636636
pub enum IndexType {
637637
Regular = 1,
638638
Aggregate = 2,
639639
}
640640

641641
data_frame_from! {
642-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
642+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
643643
pub struct Index {
644644
name: String,
645645
table_id: u64,
@@ -656,7 +656,7 @@ pub struct Index {
656656

657657
impl RocksEntity for Index {}
658658

659-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
659+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
660660
pub enum AggregateFunction {
661661
SUM = 1,
662662
MAX = 2,

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ impl WriteBatchIterator for WriteBatchContainer {
598598
}
599599
}
600600

601-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
601+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
602602
pub struct IdRow<T: Clone> {
603603
pub(crate) id: u64,
604604
pub(crate) row: T,

rust/cubestore/cubestore/src/metastore/table.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use serde::{Deserialize, Deserializer, Serialize};
2323
use std::io::Write;
2424
use std::sync::Arc;
2525

26-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
26+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
2727
pub struct AggregateColumnIndex {
2828
index: u64,
2929
function: AggregateFunction,
@@ -114,7 +114,7 @@ impl core::fmt::Display for AggregateColumn {
114114
}
115115
}
116116

117-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
117+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
118118
pub enum StreamOffset {
119119
Earliest = 1,
120120
Latest = 2,
@@ -129,7 +129,7 @@ impl DataFrameValue<String> for Option<StreamOffset> {
129129
}
130130

131131
data_frame_from! {
132-
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
132+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, PartialOrd)]
133133
pub struct Table {
134134
table_name: String,
135135
schema_id: u64,
@@ -172,7 +172,7 @@ pub struct Table {
172172

173173
impl RocksEntity for Table {}
174174

175-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
175+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, PartialOrd)]
176176
pub struct TablePath {
177177
pub table: IdRow<Table>,
178178
pub schema: Arc<IdRow<Schema>>,

rust/cubestore/cubestore/src/queryplanner/flatten_union.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,13 @@ impl OptimizerRule for FlattenUnion {
4747
| LogicalPlan::Values(_)
4848
| LogicalPlan::Analyze(_)
4949
| LogicalPlan::Distinct(_)
50-
| LogicalPlan::Prepare(_)
5150
// | LogicalPlan::Execute(_)
5251
| LogicalPlan::Dml(_)
5352
| LogicalPlan::Ddl(_)
5453
| LogicalPlan::Copy(_)
5554
| LogicalPlan::DescribeTable(_)
5655
| LogicalPlan::Unnest(_)
5756
| LogicalPlan::RecursiveQuery(_)
58-
| LogicalPlan::CrossJoin(_)
5957
=> {
6058
// apply the optimization to all inputs of the plan
6159
let inputs = plan.inputs();

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ pub mod optimizations;
33
pub mod panic;
44
mod partition_filter;
55
mod planning;
6+
use datafusion::logical_expr::planner::ExprPlanner;
7+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
68
// use datafusion::physical_plan::parquet::MetadataCacheFactory;
79
pub use planning::PlanningMeta;
810
mod check_memory;
@@ -81,10 +83,11 @@ use datafusion::logical_expr::{
8183
TableSource, WindowUDF,
8284
};
8385
use datafusion::physical_expr::EquivalenceProperties;
84-
use datafusion::physical_plan::memory::MemoryExec;
86+
// TODO upgrade DF
87+
// use datafusion::physical_plan::memory::MemoryExec;
8588
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8689
use datafusion::physical_plan::{
87-
collect, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
90+
collect, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
8891
PlanProperties, SendableRecordBatchStream,
8992
};
9093
use datafusion::prelude::{SessionConfig, SessionContext};
@@ -288,6 +291,7 @@ struct MetaStoreSchemaProvider {
288291
inline_tables: InlineTables,
289292
cache: Arc<SqlResultCache>,
290293
config_options: ConfigOptions,
294+
expr_planners: Vec<Arc<dyn ExprPlanner>>, // session_state.expr_planners clone
291295
session_state: Arc<SessionState>,
292296
}
293297

@@ -333,6 +337,7 @@ impl MetaStoreSchemaProvider {
333337
cache,
334338
inline_tables: (*inline_tables).clone(),
335339
config_options: ConfigOptions::new(),
340+
expr_planners: datafusion::execution::FunctionRegistry::expr_planners(session_state.as_ref()),
336341
session_state,
337342
}
338343
}
@@ -572,6 +577,11 @@ impl ContextProvider for MetaStoreSchemaProvider {
572577
.cloned()
573578
.collect()
574579
}
580+
581+
// We implement this for count(*) replacement.
582+
fn get_expr_planners(&self) -> &[Arc<dyn datafusion::logical_expr::planner::ExprPlanner>] {
583+
self.expr_planners.as_slice()
584+
}
575585
}
576586

577587
/// Enables our options used with `SqlToRel`. Sets `enable_ident_normalization` to false. See also
@@ -760,7 +770,8 @@ impl TableProvider for InfoSchemaTableProvider {
760770
properties: PlanProperties::new(
761771
EquivalenceProperties::new(schema),
762772
Partitioning::UnknownPartitioning(1),
763-
ExecutionMode::Bounded,
773+
EmissionType::Both, // TODO upgrade DF: Both is safe choice
774+
Boundedness::Bounded,
764775
),
765776
};
766777
Ok(Arc::new(exec))

rust/cubestore/cubestore/src/queryplanner/optimizations/check_memory.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use crate::queryplanner::check_memory::CheckMemoryExec;
22
use crate::queryplanner::query_executor::ClusterSendExec;
33
use crate::util::memory::MemoryHandler;
44
use datafusion::datasource::physical_plan::ParquetExec;
5+
use datafusion::datasource::source::DataSourceExec;
56
use datafusion::error::DataFusionError;
6-
use datafusion::physical_plan::memory::MemoryExec;
77
use datafusion::physical_plan::ExecutionPlan;
8+
use datafusion_datasource::memory::MemoryExec;
89
use std::sync::Arc;
910

1011
/// Add `CheckMemoryExec` behind some nodes.
@@ -13,7 +14,8 @@ pub fn add_check_memory_exec(
1314
mem_handler: Arc<dyn MemoryHandler>,
1415
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
1516
let p_any = p.as_any();
16-
if p_any.is::<ParquetExec>() || p_any.is::<MemoryExec>() || p_any.is::<ClusterSendExec>() {
17+
// TODO upgrade DF: Do we use ParquetExec? Or just DataSourceExec? It's fine to have both here.
18+
if p_any.is::<DataSourceExec>() || p_any.is::<ParquetExec>() || p_any.is::<MemoryExec>() || p_any.is::<ClusterSendExec>() {
1719
let memory_check = Arc::new(CheckMemoryExec::new(p, mem_handler.clone()));
1820
Ok(memory_check)
1921
} else {

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::queryplanner::query_executor::ClusterSendExec;
44
use crate::queryplanner::tail_limit::TailLimitExec;
55
use crate::queryplanner::topk::AggregateTopKExec;
66
use datafusion::error::DataFusionError;
7+
use datafusion::physical_expr::LexOrdering;
78
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
89
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
910
use datafusion::physical_plan::limit::GlobalLimitExec;
@@ -122,11 +123,11 @@ pub fn ensure_partition_merge_helper(
122123
.children()
123124
.into_iter()
124125
.map(|c| -> Arc<dyn ExecutionPlan> {
125-
Arc::new(SortPreservingMergeExec::new(ordering.clone(), c.clone()))
126+
Arc::new(SortPreservingMergeExec::new(LexOrdering::new(ordering.clone()), c.clone()))
126127
})
127128
.collect();
128129
let new_plan = p.clone().with_new_children(merged_children)?;
129-
Arc::new(SortPreservingMergeExec::new(ordering, new_plan))
130+
Arc::new(SortPreservingMergeExec::new(LexOrdering::new(ordering), new_plan))
130131
} else {
131132
let merged_children = p
132133
.children()

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl QueryPlanner for CubeQueryPlanner {
107107
}
108108
}
109109

110+
#[derive(Debug)]
110111
pub struct PreOptimizeRule {
111112
memory_handler: Arc<dyn MemoryHandler>,
112113
data_loaded_size: Option<Arc<DataLoadedSize>>,

0 commit comments

Comments
 (0)