Skip to content

Commit d4cc10f

Browse files
authored
Upgrade DataFusion 48.0.0 (#61)
* Upgrade to DF48 * fix bug * update * update more
1 parent b733a12 commit d4cc10f

File tree

4 files changed

+34
-22
lines changed

4 files changed

+34
-22
lines changed

Cargo.toml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"]
2828
rust-version = "1.80"
2929

3030
[dependencies]
31-
arrow = "55"
32-
arrow-schema = "55"
31+
arrow = "55.1.0"
32+
arrow-schema = "55.1.0"
3333
async-trait = "0.1"
3434
dashmap = "6"
35-
datafusion = "47"
36-
datafusion-common = "47"
37-
datafusion-expr = "47"
38-
datafusion-functions = "47"
39-
datafusion-functions-aggregate = "47"
40-
datafusion-optimizer = "47"
41-
datafusion-physical-expr = "47"
42-
datafusion-physical-plan = "47"
43-
datafusion-sql = "47"
35+
datafusion = "48"
36+
datafusion-common = "48"
37+
datafusion-expr = "48"
38+
datafusion-functions = "48"
39+
datafusion-functions-aggregate = "48"
40+
datafusion-optimizer = "48"
41+
datafusion-physical-expr = "48"
42+
datafusion-physical-plan = "48"
43+
datafusion-sql = "48"
4444
futures = "0.3"
4545
itertools = "0.14"
4646
log = "0.4"

src/materialized/dependencies.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ impl TableFunctionImpl for StaleFilesUdtf {
231231
/// Extract table name from args passed to TableFunctionImpl::call()
232232
fn get_table_name(args: &[Expr]) -> Result<&String> {
233233
match &args[0] {
234-
Expr::Literal(ScalarValue::Utf8(Some(table_name))) => Ok(table_name),
234+
Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) => Ok(table_name),
235235
_ => Err(DataFusionError::Plan(
236236
"expected a single string literal argument to mv_dependencies".to_string(),
237237
)),

src/rewrite/exploitation.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,14 @@ impl ExecutionPlan for OneOfExec {
427427
}
428428

429429
fn statistics(&self) -> Result<datafusion_common::Statistics> {
430-
self.candidates[self.best].statistics()
430+
self.candidates[self.best].partition_statistics(None)
431+
}
432+
433+
fn partition_statistics(
434+
&self,
435+
partition: Option<usize>,
436+
) -> Result<datafusion_common::Statistics> {
437+
self.candidates[self.best].partition_statistics(partition)
431438
}
432439
}
433440

src/rewrite/normal_form.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,10 @@ impl Predicate {
455455
self.eq_classes[idx].columns.insert(c2.clone());
456456
}
457457
(Some(&i), Some(&j)) => {
458+
if i == j {
459+
// The two columns are already in the same equivalence class.
460+
return Ok(());
461+
}
458462
// We need to merge two existing column eq classes.
459463

460464
// Delete the eq class with a larger index,
@@ -593,15 +597,15 @@ impl Predicate {
593597
/// Add a binary expression to our collection of filters.
594598
fn insert_binary_expr(&mut self, left: &Expr, op: Operator, right: &Expr) -> Result<()> {
595599
match (left, op, right) {
596-
(Expr::Column(c), op, Expr::Literal(v)) => {
600+
(Expr::Column(c), op, Expr::Literal(v, _)) => {
597601
if let Err(e) = self.add_range(c, &op, v) {
598602
// Add a range can fail in some cases, so just fallthrough
599603
log::debug!("failed to add range filter: {e}");
600604
} else {
601605
return Ok(());
602606
}
603607
}
604-
(Expr::Literal(_), op, Expr::Column(_)) => {
608+
(Expr::Literal(_, _), op, Expr::Column(_)) => {
605609
if let Some(swapped) = op.swap() {
606610
return self.insert_binary_expr(right, swapped, left);
607611
}
@@ -714,22 +718,22 @@ impl Predicate {
714718
extra_range_filters.push(Expr::BinaryExpr(BinaryExpr {
715719
left: Box::new(Expr::Column(other_column.clone())),
716720
op: Operator::Eq,
717-
right: Box::new(Expr::Literal(range.lower().clone())),
721+
right: Box::new(Expr::Literal(range.lower().clone(), None)),
718722
}))
719723
} else {
720724
if !range.lower().is_null() {
721725
extra_range_filters.push(Expr::BinaryExpr(BinaryExpr {
722726
left: Box::new(Expr::Column(other_column.clone())),
723727
op: Operator::GtEq,
724-
right: Box::new(Expr::Literal(range.lower().clone())),
728+
right: Box::new(Expr::Literal(range.lower().clone(), None)),
725729
}))
726730
}
727731

728732
if !range.upper().is_null() {
729733
extra_range_filters.push(Expr::BinaryExpr(BinaryExpr {
730734
left: Box::new(Expr::Column(other_column.clone())),
731735
op: Operator::LtEq,
732-
right: Box::new(Expr::Literal(range.upper().clone())),
736+
right: Box::new(Expr::Literal(range.upper().clone(), None)),
733737
}))
734738
}
735739
}
@@ -984,7 +988,8 @@ mod test {
984988
let ctx = SessionContext::new_with_config(
985989
SessionConfig::new()
986990
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
987-
.set_bool("datafusion.explain.logical_plan_only", true),
991+
.set_bool("datafusion.explain.logical_plan_only", true)
992+
.set_bool("datafusion.sql_parser.map_varchar_to_utf8view", false),
988993
);
989994

990995
let t1_path = tempdir()?;
@@ -996,11 +1001,11 @@ mod test {
9961001
ctx.sql(&format!(
9971002
"
9981003
CREATE EXTERNAL TABLE t1 (
999-
column1 VARCHAR,
1000-
column2 BIGINT,
1004+
column1 VARCHAR,
1005+
column2 BIGINT,
10011006
column3 CHAR
10021007
)
1003-
STORED AS PARQUET
1008+
STORED AS PARQUET
10041009
LOCATION '{}'",
10051010
t1_path.path().to_string_lossy()
10061011
))

0 commit comments

Comments
 (0)