Skip to content

Commit 53e97ff

Browse files
committed
Merge remote-tracking branch 'apache/main' into alamb/features2
2 parents 6945c76 + db871af commit 53e97ff

File tree

38 files changed

+708
-370
lines changed

38 files changed

+708
-370
lines changed

benchmarks/src/imdb/convert.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::dataframe::DataFrameWriteOptions;
19+
use datafusion::logical_expr::select_expr::SelectExpr;
1920
use datafusion_common::instant::Instant;
2021
use std::path::PathBuf;
2122

@@ -74,7 +75,8 @@ impl ConvertOpt {
7475
.iter()
7576
.take(schema.fields.len())
7677
.map(Expr::from)
77-
.collect();
78+
.map(SelectExpr::from)
79+
.collect::<Vec<_>>();
7880

7981
csv = csv.select(selection)?;
8082

benchmarks/src/tpch/convert.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::logical_expr::select_expr::SelectExpr;
1819
use datafusion_common::instant::Instant;
1920
use std::fs;
2021
use std::path::{Path, PathBuf};
@@ -89,7 +90,8 @@ impl ConvertOpt {
8990
.iter()
9091
.take(schema.fields.len() - 1)
9192
.map(Expr::from)
92-
.collect();
93+
.map(SelectExpr::from)
94+
.collect::<Vec<_>>();
9395

9496
csv = csv.select(selection)?;
9597
// optionally, repartition the file

datafusion/core/src/dataframe/mod.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use datafusion_common::{
5454
exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
5555
DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions,
5656
};
57+
use datafusion_expr::select_expr::SelectExpr;
5758
use datafusion_expr::{
5859
case,
5960
dml::InsertOp,
@@ -342,13 +343,28 @@ impl DataFrame {
342343
/// # Ok(())
343344
/// # }
344345
/// ```
345-
pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame> {
346-
let window_func_exprs = find_window_exprs(&expr_list);
346+
pub fn select(
347+
self,
348+
expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
349+
) -> Result<DataFrame> {
350+
let expr_list: Vec<SelectExpr> =
351+
expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
352+
353+
let expressions = expr_list
354+
.iter()
355+
.filter_map(|e| match e {
356+
SelectExpr::Expression(expr) => Some(expr.clone()),
357+
_ => None,
358+
})
359+
.collect::<Vec<_>>();
360+
361+
let window_func_exprs = find_window_exprs(&expressions);
347362
let plan = if window_func_exprs.is_empty() {
348363
self.plan
349364
} else {
350365
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
351366
};
367+
352368
let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
353369

354370
Ok(DataFrame {

datafusion/core/tests/dataframe/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ use datafusion_expr::expr::{GroupingSet, Sort, WindowFunction};
7373
use datafusion_expr::var_provider::{VarProvider, VarType};
7474
use datafusion_expr::{
7575
cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder,
76-
scalar_subquery, when, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan,
76+
scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan,
7777
ScalarFunctionImplementation, WindowFrame, WindowFrameBound, WindowFrameUnits,
7878
WindowFunctionDefinition,
7979
};
@@ -269,6 +269,16 @@ async fn select_expr() -> Result<()> {
269269
Ok(())
270270
}
271271

272+
#[tokio::test]
273+
async fn select_all() -> Result<()> {
274+
let t = test_table().await?;
275+
let plan = t.select([wildcard()])?.logical_plan().clone();
276+
let sql_plan = create_plan("SELECT * FROM aggregate_test_100").await?;
277+
assert_same_plan(&plan, &sql_plan);
278+
279+
Ok(())
280+
}
281+
272282
#[tokio::test]
273283
async fn select_exprs() -> Result<()> {
274284
// build plan using `select_expr``

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use datafusion_common::Result;
2929
use datafusion_common::{JoinSide, JoinType, ScalarValue};
3030
use datafusion_execution::object_store::ObjectStoreUrl;
3131
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
32-
use datafusion_expr::{Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility};
32+
use datafusion_expr::{
33+
Operator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
34+
};
3335
use datafusion_physical_expr::expressions::{
3436
binary, cast, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
3537
};
@@ -57,6 +59,7 @@ use datafusion_physical_plan::streaming::StreamingTableExec;
5759
use datafusion_physical_plan::union::UnionExec;
5860
use datafusion_physical_plan::{get_plan_string, ExecutionPlan};
5961

62+
use datafusion_expr_common::columnar_value::ColumnarValue;
6063
use itertools::Itertools;
6164

6265
/// Mocked UDF
@@ -89,6 +92,10 @@ impl ScalarUDFImpl for DummyUDF {
8992
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
9093
Ok(DataType::Int32)
9194
}
95+
96+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
97+
panic!("dummy - not implemented")
98+
}
9299
}
93100

94101
#[test]

datafusion/core/tests/user_defined/user_defined_scalar_functions.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,10 @@ impl ScalarUDFImpl for CastToI64UDF {
679679
Ok(DataType::Int64)
680680
}
681681

682+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
683+
panic!("dummy - not implemented")
684+
}
685+
682686
// Demonstrate simplifying a UDF
683687
fn simplify(
684688
&self,
@@ -946,6 +950,10 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
946950
Ok(self.return_type.clone())
947951
}
948952

953+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
954+
panic!("dummy - not implemented")
955+
}
956+
949957
fn simplify(
950958
&self,
951959
args: Vec<Expr>,

datafusion/datasource-parquet/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use datafusion_common::config::TableParquetOptions;
3434
use datafusion_common::Statistics;
3535
use datafusion_datasource::file::FileSource;
3636
use datafusion_datasource::file_scan_config::FileScanConfig;
37+
use datafusion_physical_expr_common::physical_expr::fmt_sql;
3738
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3839
use datafusion_physical_optimizer::pruning::PruningPredicate;
3940
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
@@ -580,7 +581,7 @@ impl FileSource for ParquetSource {
580581
}
581582
DisplayFormatType::TreeRender => {
582583
if let Some(predicate) = self.predicate() {
583-
writeln!(f, "predicate={predicate}")?;
584+
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
584585
}
585586
Ok(())
586587
}

datafusion/expr/src/expr_fn.rs

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::function::{
2525
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
2626
StateFieldsArgs,
2727
};
28+
use crate::select_expr::SelectExpr;
2829
use crate::{
2930
conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery,
3031
AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, ScalarFunctionArgs,
@@ -120,21 +121,13 @@ pub fn placeholder(id: impl Into<String>) -> Expr {
120121
/// let p = wildcard();
121122
/// assert_eq!(p.to_string(), "*")
122123
/// ```
123-
pub fn wildcard() -> Expr {
124-
#[expect(deprecated)]
125-
Expr::Wildcard {
126-
qualifier: None,
127-
options: Box::new(WildcardOptions::default()),
128-
}
124+
pub fn wildcard() -> SelectExpr {
125+
SelectExpr::Wildcard(WildcardOptions::default())
129126
}
130127

131128
/// Create an '*' [`Expr::Wildcard`] expression with the wildcard options
132-
pub fn wildcard_with_options(options: WildcardOptions) -> Expr {
133-
#[expect(deprecated)]
134-
Expr::Wildcard {
135-
qualifier: None,
136-
options: Box::new(options),
137-
}
129+
pub fn wildcard_with_options(options: WildcardOptions) -> SelectExpr {
130+
SelectExpr::Wildcard(options)
138131
}
139132

140133
/// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns from a specific table
@@ -147,24 +140,16 @@ pub fn wildcard_with_options(options: WildcardOptions) -> Expr {
147140
/// let p = qualified_wildcard(TableReference::bare("t"));
148141
/// assert_eq!(p.to_string(), "t.*")
149142
/// ```
150-
pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> Expr {
151-
#[expect(deprecated)]
152-
Expr::Wildcard {
153-
qualifier: Some(qualifier.into()),
154-
options: Box::new(WildcardOptions::default()),
155-
}
143+
pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> SelectExpr {
144+
SelectExpr::QualifiedWildcard(qualifier.into(), WildcardOptions::default())
156145
}
157146

158147
/// Create an 't.*' [`Expr::Wildcard`] expression with the wildcard options
159148
pub fn qualified_wildcard_with_options(
160149
qualifier: impl Into<TableReference>,
161150
options: WildcardOptions,
162-
) -> Expr {
163-
#[expect(deprecated)]
164-
Expr::Wildcard {
165-
qualifier: Some(qualifier.into()),
166-
options: Box::new(options),
167-
}
151+
) -> SelectExpr {
152+
SelectExpr::QualifiedWildcard(qualifier.into(), options)
168153
}
169154

170155
/// Return a new expression `left <op> right`

datafusion/expr/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub mod expr_fn;
4848
pub mod expr_rewriter;
4949
pub mod expr_schema;
5050
pub mod function;
51+
pub mod select_expr;
5152
pub mod groups_accumulator {
5253
pub use datafusion_expr_common::groups_accumulator::*;
5354
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::iter::once;
2424
use std::sync::Arc;
2525

2626
use crate::dml::CopyTo;
27-
use crate::expr::{Alias, Sort as SortExpr};
27+
use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
2828
use crate::expr_rewriter::{
2929
coerce_plan_expr_for_schema, normalize_col,
3030
normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
@@ -36,9 +36,11 @@ use crate::logical_plan::{
3636
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
3737
Window,
3838
};
39+
use crate::select_expr::SelectExpr;
3940
use crate::utils::{
40-
can_hash, columnize_expr, compare_sort_expr, expr_to_columns,
41-
find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
41+
can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42+
expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43+
group_window_expr_by_sort_keys,
4244
};
4345
use crate::{
4446
and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
@@ -520,10 +522,11 @@ impl LogicalPlanBuilder {
520522
}
521523
Ok(plan)
522524
}
525+
523526
/// Apply a projection without alias.
524527
pub fn project(
525528
self,
526-
expr: impl IntoIterator<Item = impl Into<Expr>>,
529+
expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
527530
) -> Result<Self> {
528531
project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
529532
}
@@ -532,7 +535,7 @@ impl LogicalPlanBuilder {
532535
/// (true to validate, false to not validate)
533536
pub fn project_with_validation(
534537
self,
535-
expr: Vec<(impl Into<Expr>, bool)>,
538+
expr: Vec<(impl Into<SelectExpr>, bool)>,
536539
) -> Result<Self> {
537540
project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
538541
}
@@ -1657,7 +1660,7 @@ pub fn union_by_name(
16571660
/// * An invalid expression is used (e.g. a `sort` expression)
16581661
pub fn project(
16591662
plan: LogicalPlan,
1660-
expr: impl IntoIterator<Item = impl Into<Expr>>,
1663+
expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
16611664
) -> Result<LogicalPlan> {
16621665
project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
16631666
}
@@ -1671,15 +1674,54 @@ pub fn project(
16711674
/// * An invalid expression is used (e.g. a `sort` expression)
16721675
fn project_with_validation(
16731676
plan: LogicalPlan,
1674-
expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
1677+
expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
16751678
) -> Result<LogicalPlan> {
16761679
let mut projected_expr = vec![];
16771680
for (e, validate) in expr {
16781681
let e = e.into();
16791682
match e {
1680-
#[expect(deprecated)]
1681-
Expr::Wildcard { .. } => projected_expr.push(e),
1682-
_ => {
1683+
SelectExpr::Wildcard(opt) => {
1684+
let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1685+
1686+
// If there is a REPLACE statement, replace that column with the given
1687+
// replace expression. Column name remains the same.
1688+
let expanded = if let Some(replace) = opt.replace {
1689+
replace_columns(expanded, &replace)?
1690+
} else {
1691+
expanded
1692+
};
1693+
1694+
for e in expanded {
1695+
if validate {
1696+
projected_expr
1697+
.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1698+
} else {
1699+
projected_expr.push(e)
1700+
}
1701+
}
1702+
}
1703+
SelectExpr::QualifiedWildcard(table_ref, opt) => {
1704+
let expanded =
1705+
expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1706+
1707+
// If there is a REPLACE statement, replace that column with the given
1708+
// replace expression. Column name remains the same.
1709+
let expanded = if let Some(replace) = opt.replace {
1710+
replace_columns(expanded, &replace)?
1711+
} else {
1712+
expanded
1713+
};
1714+
1715+
for e in expanded {
1716+
if validate {
1717+
projected_expr
1718+
.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1719+
} else {
1720+
projected_expr.push(e)
1721+
}
1722+
}
1723+
}
1724+
SelectExpr::Expression(e) => {
16831725
if validate {
16841726
projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
16851727
} else {
@@ -1693,6 +1735,29 @@ fn project_with_validation(
16931735
Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
16941736
}
16951737

1738+
/// If there is a REPLACE statement in the projected expression in the form of
1739+
/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1740+
/// that column with the given replace expression. Column name remains the same.
1741+
/// Multiple REPLACEs are also possible with comma separations.
1742+
fn replace_columns(
1743+
mut exprs: Vec<Expr>,
1744+
replace: &PlannedReplaceSelectItem,
1745+
) -> Result<Vec<Expr>> {
1746+
for expr in exprs.iter_mut() {
1747+
if let Expr::Column(Column { name, .. }) = expr {
1748+
if let Some((_, new_expr)) = replace
1749+
.items()
1750+
.iter()
1751+
.zip(replace.expressions().iter())
1752+
.find(|(item, _)| item.column_name.value == *name)
1753+
{
1754+
*expr = new_expr.clone().alias(name.clone())
1755+
}
1756+
}
1757+
}
1758+
Ok(exprs)
1759+
}
1760+
16961761
/// Create a SubqueryAlias to wrap a LogicalPlan.
16971762
pub fn subquery_alias(
16981763
plan: LogicalPlan,
@@ -1811,7 +1876,7 @@ pub fn wrap_projection_for_join_if_necessary(
18111876
projection.extend(join_key_items);
18121877

18131878
LogicalPlanBuilder::from(input)
1814-
.project(projection)?
1879+
.project(projection.into_iter().map(SelectExpr::from))?
18151880
.build()?
18161881
} else {
18171882
input

0 commit comments

Comments
 (0)