Skip to content

Commit 3fc9b4c

Browse files
andygroveMazterQyou
authored andcommitted
Move logical expression type-coercion code from physical-expr crate to expr crate (apache#2257)
1 parent 807f17c commit 3fc9b4c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2009
-1955
lines changed

datafusion-examples/examples/simple_udaf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ use datafusion::arrow::{
2323
};
2424

2525
use datafusion::from_slice::FromSlice;
26-
use datafusion::physical_plan::functions::Volatility;
2726
use datafusion::{error::Result, logical_plan::create_udaf, physical_plan::Accumulator};
28-
use datafusion::{prelude::*, scalar::ScalarValue};
27+
use datafusion::{logical_expr::Volatility, prelude::*, scalar::ScalarValue};
2928
use std::sync::Arc;
3029

3130
// create local session context with an in-memory table

datafusion-examples/examples/simple_udf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use datafusion::{
2121
datatypes::DataType,
2222
record_batch::RecordBatch,
2323
},
24-
physical_plan::functions::Volatility,
24+
logical_expr::Volatility,
2525
};
2626

2727
use datafusion::from_slice::FromSlice;

datafusion/core/src/dataframe.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -679,12 +679,14 @@ mod tests {
679679

680680
use super::*;
681681
use crate::execution::options::CsvReadOptions;
682-
use crate::physical_plan::{window_functions, ColumnarValue};
682+
use crate::physical_plan::ColumnarValue;
683683
use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
684684
use crate::{logical_plan::*, test_util};
685685
use arrow::datatypes::DataType;
686-
use datafusion_expr::ScalarFunctionImplementation;
687686
use datafusion_expr::Volatility;
687+
use datafusion_expr::{
688+
BuiltInWindowFunction, ScalarFunctionImplementation, WindowFunction,
689+
};
688690

689691
#[tokio::test]
690692
async fn select_columns() -> Result<()> {
@@ -724,9 +726,7 @@ mod tests {
724726
// build plan using Table API
725727
let t = test_table().await?;
726728
let first_row = Expr::WindowFunction {
727-
fun: window_functions::WindowFunction::BuiltInWindowFunction(
728-
window_functions::BuiltInWindowFunction::FirstValue,
729-
),
729+
fun: WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::FirstValue),
730730
args: vec![col("aggregate_test_100.c1")],
731731
partition_by: vec![col("aggregate_test_100.c2")],
732732
order_by: vec![],

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ use crate::{
4141
error::Result,
4242
execution::context::SessionContext,
4343
logical_plan::{self, Expr, ExprVisitable, ExpressionVisitor, Recursion},
44-
physical_plan::functions::Volatility,
4544
scalar::ScalarValue,
4645
};
4746

4847
use super::{PartitionedFile, PartitionedFileStream};
4948
use datafusion_data_access::{object_store::ObjectStore, FileMeta, SizedFile};
49+
use datafusion_expr::Volatility;
5050

5151
const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
5252
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";

datafusion/core/src/execution/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,7 +1543,7 @@ mod tests {
15431543
use super::*;
15441544
use crate::execution::context::QueryPlanner;
15451545
use crate::logical_plan::{binary_expr, create_udtf, lit, Operator};
1546-
use crate::physical_plan::functions::{make_table_function, Volatility};
1546+
use crate::physical_plan::functions::make_table_function;
15471547
use crate::test;
15481548
use crate::variable::VarType;
15491549
use crate::{
@@ -1556,7 +1556,7 @@ mod tests {
15561556
use arrow::datatypes::*;
15571557
use arrow::record_batch::RecordBatch;
15581558
use async_trait::async_trait;
1559-
use datafusion_expr::TableFunctionImplementation;
1559+
use datafusion_expr::{TableFunctionImplementation, Volatility};
15601560
use std::fs::File;
15611561
use std::sync::Weak;
15621562
use std::thread::{self, JoinHandle};

datafusion/core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,9 @@ pub mod variable;
225225
pub use arrow;
226226
pub use parquet;
227227

228-
// re-export object store dependencies
228+
// re-export DataFusion crates
229229
pub use datafusion_data_access;
230+
pub use datafusion_expr as logical_expr;
230231

231232
#[cfg(feature = "row")]
232233
pub mod row;

datafusion/core/src/logical_plan/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use arrow::{
3636
record_batch::RecordBatch,
3737
};
3838
use datafusion_data_access::object_store::ObjectStore;
39-
use datafusion_physical_expr::coercion_rule::binary_rule::comparison_eq_coercion;
39+
use datafusion_expr::binary_rule::comparison_eq_coercion;
4040
use std::convert::TryFrom;
4141
use std::{
4242
collections::{HashMap, HashSet},

datafusion/core/src/logical_plan/expr_schema.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
// under the License.
1717

1818
use super::{Expr, Like};
19-
use crate::physical_plan::{
20-
aggregates, expressions::binary_operator_data_type, functions, window_functions,
21-
};
19+
use crate::logical_expr::{aggregate_function, function, window_function};
2220
use arrow::compute::can_cast_types;
2321
use arrow::datatypes::DataType;
2422
use datafusion_common::{DFField, DFSchema, DataFusionError, ExprSchema, Result};
23+
use datafusion_expr::binary_rule::binary_operator_data_type;
2524
use datafusion_physical_expr::field_util::get_indexed_field;
2625

2726
/// trait to allow expr to typable with respect to a schema
@@ -84,21 +83,21 @@ impl ExprSchemable for Expr {
8483
.iter()
8584
.map(|e| e.get_type(schema))
8685
.collect::<Result<Vec<_>>>()?;
87-
functions::return_type(fun, &data_types)
86+
function::return_type(fun, &data_types)
8887
}
8988
Expr::WindowFunction { fun, args, .. } => {
9089
let data_types = args
9190
.iter()
9291
.map(|e| e.get_type(schema))
9392
.collect::<Result<Vec<_>>>()?;
94-
window_functions::return_type(fun, &data_types)
93+
window_function::return_type(fun, &data_types)
9594
}
9695
Expr::AggregateFunction { fun, args, .. } => {
9796
let data_types = args
9897
.iter()
9998
.map(|e| e.get_type(schema))
10099
.collect::<Result<Vec<_>>>()?;
101-
aggregates::return_type(fun, &data_types)
100+
aggregate_function::return_type(fun, &data_types)
102101
}
103102
Expr::AggregateUDF { fun, args, .. } => {
104103
let data_types = args

datafusion/core/src/optimizer/simplify_expressions.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use crate::logical_plan::{
2727
use crate::optimizer::optimizer::OptimizerConfig;
2828
use crate::optimizer::optimizer::OptimizerRule;
2929
use crate::optimizer::utils;
30-
use crate::physical_plan::functions::Volatility;
3130
use crate::physical_plan::planner::create_physical_expr;
3231
use crate::scalar::ScalarValue;
3332
use crate::{error::Result, logical_plan::Operator};
3433
use arrow::array::new_null_array;
3534
use arrow::datatypes::{DataType, Field, Schema};
3635
use arrow::record_batch::RecordBatch;
36+
use datafusion_expr::Volatility;
3737

3838
/// Provides simplification information based on schema and properties
3939
pub(crate) struct SimplifyContext<'a, 'b> {
@@ -755,14 +755,15 @@ mod tests {
755755

756756
use arrow::array::{ArrayRef, Int32Array};
757757
use chrono::{DateTime, TimeZone, Utc};
758+
use datafusion_expr::BuiltinScalarFunction;
758759

759760
use super::*;
760761
use crate::assert_contains;
761762
use crate::logical_plan::{
762763
and, binary_expr, call_fn, col, create_udf, lit, lit_timestamp_nano, DFField,
763764
Expr, LogicalPlanBuilder,
764765
};
765-
use crate::physical_plan::functions::{make_scalar_function, BuiltinScalarFunction};
766+
use crate::physical_plan::functions::make_scalar_function;
766767
use crate::physical_plan::udf::ScalarUDF;
767768

768769
#[test]

datafusion/core/src/physical_plan/aggregate_rule.rs

Lines changed: 1 addition & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -17,102 +17,4 @@
1717

1818
//! Support the coercion rule for aggregate function.
1919
20-
pub use datafusion_physical_expr::coercion_rule::aggregate_rule::{
21-
coerce_exprs, coerce_types,
22-
};
23-
24-
#[cfg(test)]
25-
mod tests {
26-
use super::*;
27-
use crate::physical_plan::aggregates;
28-
use arrow::datatypes::DataType;
29-
use datafusion_expr::AggregateFunction;
30-
31-
#[test]
32-
fn test_aggregate_coerce_types() {
33-
// test input args with error number input types
34-
let fun = AggregateFunction::Min;
35-
let input_types = vec![DataType::Int64, DataType::Int32];
36-
let signature = aggregates::signature(&fun);
37-
let result = coerce_types(&fun, &input_types, &signature);
38-
assert_eq!("Error during planning: The function Min expects 1 arguments, but 2 were provided", result.unwrap_err().to_string());
39-
40-
// test input args is invalid data type for sum or avg
41-
let fun = AggregateFunction::Sum;
42-
let input_types = vec![DataType::Utf8];
43-
let signature = aggregates::signature(&fun);
44-
let result = coerce_types(&fun, &input_types, &signature);
45-
assert_eq!(
46-
"Error during planning: The function Sum does not support inputs of type Utf8.",
47-
result.unwrap_err().to_string()
48-
);
49-
let fun = AggregateFunction::Avg;
50-
let signature = aggregates::signature(&fun);
51-
let result = coerce_types(&fun, &input_types, &signature);
52-
assert_eq!(
53-
"Error during planning: The function Avg does not support inputs of type Utf8.",
54-
result.unwrap_err().to_string()
55-
);
56-
57-
// test count, array_agg, approx_distinct, min, max.
58-
// the coerced types is same with input types
59-
let funs = vec![
60-
AggregateFunction::Count,
61-
AggregateFunction::ArrayAgg,
62-
AggregateFunction::ApproxDistinct,
63-
AggregateFunction::Min,
64-
AggregateFunction::Max,
65-
];
66-
let input_types = vec![
67-
vec![DataType::Int32],
68-
// support the decimal data type for min/max agg
69-
// vec![DataType::Decimal(10, 2)],
70-
vec![DataType::Utf8],
71-
];
72-
for fun in funs {
73-
for input_type in &input_types {
74-
let signature = aggregates::signature(&fun);
75-
let result = coerce_types(&fun, input_type, &signature);
76-
assert_eq!(*input_type, result.unwrap());
77-
}
78-
}
79-
// test sum, avg
80-
let funs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
81-
let input_types = vec![
82-
vec![DataType::Int32],
83-
vec![DataType::Float32],
84-
vec![DataType::Decimal(20, 3)],
85-
];
86-
for fun in funs {
87-
for input_type in &input_types {
88-
let signature = aggregates::signature(&fun);
89-
let result = coerce_types(&fun, input_type, &signature);
90-
assert_eq!(*input_type, result.unwrap());
91-
}
92-
}
93-
94-
// ApproxPercentileCont input types
95-
let input_types = vec![
96-
vec![DataType::Int8, DataType::Float64],
97-
vec![DataType::Int16, DataType::Float64],
98-
vec![DataType::Int32, DataType::Float64],
99-
vec![DataType::Int64, DataType::Float64],
100-
vec![DataType::UInt8, DataType::Float64],
101-
vec![DataType::UInt16, DataType::Float64],
102-
vec![DataType::UInt32, DataType::Float64],
103-
vec![DataType::UInt64, DataType::Float64],
104-
vec![DataType::Float32, DataType::Float64],
105-
vec![DataType::Float64, DataType::Float64],
106-
];
107-
for input_type in &input_types {
108-
let signature =
109-
aggregates::signature(&AggregateFunction::ApproxPercentileCont);
110-
let result = coerce_types(
111-
&AggregateFunction::ApproxPercentileCont,
112-
input_type,
113-
&signature,
114-
);
115-
assert_eq!(*input_type, result.unwrap());
116-
}
117-
}
118-
}
20+
pub use datafusion_physical_expr::coercion_rule::aggregate_rule::coerce_exprs;

0 commit comments

Comments
 (0)