Skip to content

Commit 9742f89

Browse files
committed
Merge remote-tracking branch 'apache/df52' into fix/schema-adapter-nested-types
2 parents add9aef + 2dd8909 commit 9742f89

File tree

5 files changed

+54
-15
lines changed

5 files changed

+54
-15
lines changed

native/core/src/execution/planner.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,41 @@ impl PhysicalPlanner {
730730
Arc::new(ConfigOptions::default()),
731731
)))
732732
}
733+
// Date +/- Int8/Int16/Int32: DataFusion 52's arrow-arith kernels only
734+
// support Date32 +/- Interval types, not raw integers. Use the Spark
735+
// date_add / date_sub UDFs which handle Int8/Int16/Int32 directly.
736+
(
737+
DataFusionOperator::Plus,
738+
Ok(DataType::Date32),
739+
Ok(DataType::Int8 | DataType::Int16 | DataType::Int32),
740+
) => {
741+
let udf = Arc::new(ScalarUDF::new_from_impl(
742+
datafusion_spark::function::datetime::date_add::SparkDateAdd::new(),
743+
));
744+
Ok(Arc::new(ScalarFunctionExpr::new(
745+
"date_add",
746+
udf,
747+
vec![left, right],
748+
Arc::new(Field::new("date_add", DataType::Date32, true)),
749+
Arc::new(ConfigOptions::default()),
750+
)))
751+
}
752+
(
753+
DataFusionOperator::Minus,
754+
Ok(DataType::Date32),
755+
Ok(DataType::Int8 | DataType::Int16 | DataType::Int32),
756+
) => {
757+
let udf = Arc::new(ScalarUDF::new_from_impl(
758+
datafusion_spark::function::datetime::date_sub::SparkDateSub::new(),
759+
));
760+
Ok(Arc::new(ScalarFunctionExpr::new(
761+
"date_sub",
762+
udf,
763+
vec![left, right],
764+
Arc::new(Field::new("date_sub", DataType::Date32, true)),
765+
Arc::new(ConfigOptions::default()),
766+
)))
767+
}
733768
_ => {
734769
let data_type = return_type.map(to_arrow_datatype).unwrap();
735770
if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode)

native/core/src/parquet/parquet_exec.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,15 @@ pub(crate) fn init_datasource_exec(
8383
// dbg!(&required_schema, &data_schema);
8484

8585
// Determine the schema to use for ParquetSource
86-
// Use data_schema only if both data_schema and data_filters are set
87-
let base_schema = match (&data_schema, &data_filters) {
86+
// // Use data_schema only if both data_schema and data_filters are set
87+
let base_schema = match (&data_schema, &projection_vector) {
8888
(Some(schema), Some(_)) => Arc::clone(schema),
8989
_ => Arc::clone(&required_schema),
9090
};
91+
//let base_schema = required_schema;
92+
// dbg!(&base_schema);
93+
// dbg!(&data_schema);
94+
// dbg!(&data_filters);
9195
let partition_fields: Vec<_> = partition_schema
9296
.iter()
9397
.flat_map(|s| s.fields().iter())

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -906,10 +906,10 @@ pub fn spark_cast(
906906
data_type: &DataType,
907907
cast_options: &SparkCastOptions,
908908
) -> DataFusionResult<ColumnarValue> {
909-
let input_type = match &arg {
910-
ColumnarValue::Array(array) => array.data_type().clone(),
911-
ColumnarValue::Scalar(scalar) => scalar.data_type(),
912-
};
909+
// let input_type = match &arg {
910+
// ColumnarValue::Array(array) => array.data_type().clone(),
911+
// ColumnarValue::Scalar(scalar) => scalar.data_type(),
912+
// };
913913

914914
let result = match arg {
915915
ColumnarValue::Array(array) => {
@@ -927,10 +927,10 @@ pub fn spark_cast(
927927
}
928928
};
929929

930-
let result_type = match &result {
931-
ColumnarValue::Array(array) => array.data_type().clone(),
932-
ColumnarValue::Scalar(scalar) => scalar.data_type(),
933-
};
930+
// let result_type = match &result {
931+
// ColumnarValue::Array(array) => array.data_type().clone(),
932+
// ColumnarValue::Scalar(scalar) => scalar.data_type(),
933+
// };
934934

935935
// println!(
936936
// "spark_cast: {} -> {} (requested: {})",

native/spark-expr/src/csv_funcs/to_csv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ impl PhysicalExpr for ToCsv {
115115
)))
116116
}
117117

118-
fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
119-
unimplemented!()
118+
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119+
Display::fmt(self, f)
120120
}
121121
}
122122

native/spark-expr/src/unbound.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use arrow::datatypes::{DataType, Schema};
2020
use datafusion::common::{internal_err, Result};
2121
use datafusion::physical_expr::PhysicalExpr;
2222
use datafusion::physical_plan::ColumnarValue;
23-
use std::fmt::Formatter;
23+
use std::fmt::{Display, Formatter};
2424
use std::{hash::Hash, sync::Arc};
2525

2626
/// This is similar to `UnKnownColumn` in DataFusion, but it has data type.
@@ -64,8 +64,8 @@ impl PhysicalExpr for UnboundColumn {
6464
self
6565
}
6666

67-
fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
68-
unimplemented!()
67+
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68+
Display::fmt(self, f)
6969
}
7070

7171
/// Get the data type of this expression, given the schema of the input

0 commit comments

Comments
 (0)