Skip to content

Commit c3fdb12

Browse files
authored
AggregateUDFImpl::window_function_schema_name and AggregateUDFImpl::window_function_display_name for window aggregate function (apache#14750)
* window params * window display * doc * fix
1 parent 8503ece commit c3fdb12

File tree

17 files changed

+394
-185
lines changed

17 files changed

+394
-185
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use arrow::{
2626
use datafusion::common::ScalarValue;
2727
use datafusion::error::Result;
2828
use datafusion::functions_aggregate::average::avg_udaf;
29-
use datafusion::logical_expr::expr::WindowFunction;
29+
use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
3030
use datafusion::logical_expr::function::{
3131
PartitionEvaluatorArgs, WindowFunctionSimplification, WindowUDFFieldArgs,
3232
};
@@ -192,11 +192,13 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
192192
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
193193
Ok(Expr::WindowFunction(WindowFunction {
194194
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
195-
args: window_function.args,
196-
partition_by: window_function.partition_by,
197-
order_by: window_function.order_by,
198-
window_frame: window_function.window_frame,
199-
null_treatment: window_function.null_treatment,
195+
params: WindowFunctionParams {
196+
args: window_function.params.args,
197+
partition_by: window_function.params.partition_by,
198+
order_by: window_function.params.order_by,
199+
window_frame: window_function.params.window_frame,
200+
null_treatment: window_function.params.null_treatment,
201+
},
200202
}))
201203
};
202204

datafusion/core/src/physical_planner.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ use datafusion_common::{
7171
use datafusion_expr::dml::{CopyTo, InsertOp};
7272
use datafusion_expr::expr::{
7373
physical_name, AggregateFunction, AggregateFunctionParams, Alias, GroupingSet,
74-
WindowFunction,
74+
WindowFunction, WindowFunctionParams,
7575
};
7676
use datafusion_expr::expr_rewriter::unnormalize_cols;
7777
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
@@ -569,16 +569,24 @@ impl DefaultPhysicalPlanner {
569569

570570
let get_sort_keys = |expr: &Expr| match expr {
571571
Expr::WindowFunction(WindowFunction {
572-
ref partition_by,
573-
ref order_by,
572+
params:
573+
WindowFunctionParams {
574+
ref partition_by,
575+
ref order_by,
576+
..
577+
},
574578
..
575579
}) => generate_sort_key(partition_by, order_by),
576580
Expr::Alias(Alias { expr, .. }) => {
577581
// Convert &Box<T> to &T
578582
match &**expr {
579583
Expr::WindowFunction(WindowFunction {
580-
ref partition_by,
581-
ref order_by,
584+
params:
585+
WindowFunctionParams {
586+
ref partition_by,
587+
ref order_by,
588+
..
589+
},
582590
..
583591
}) => generate_sort_key(partition_by, order_by),
584592
_ => unreachable!(),
@@ -1509,11 +1517,14 @@ pub fn create_window_expr_with_name(
15091517
match e {
15101518
Expr::WindowFunction(WindowFunction {
15111519
fun,
1512-
args,
1513-
partition_by,
1514-
order_by,
1515-
window_frame,
1516-
null_treatment,
1520+
params:
1521+
WindowFunctionParams {
1522+
args,
1523+
partition_by,
1524+
order_by,
1525+
window_frame,
1526+
null_treatment,
1527+
},
15171528
}) => {
15181529
let physical_args =
15191530
create_physical_exprs(args, logical_schema, execution_props)?;

datafusion/expr/src/expr.rs

Lines changed: 116 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,11 @@ impl From<Arc<WindowUDF>> for WindowFunctionDefinition {
819819
pub struct WindowFunction {
820820
/// Name of the function
821821
pub fun: WindowFunctionDefinition,
822+
pub params: WindowFunctionParams,
823+
}
824+
825+
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
826+
pub struct WindowFunctionParams {
822827
/// List of expressions to feed to the functions as arguments
823828
pub args: Vec<Expr>,
824829
/// List of partition by expressions
@@ -837,11 +842,13 @@ impl WindowFunction {
837842
pub fn new(fun: impl Into<WindowFunctionDefinition>, args: Vec<Expr>) -> Self {
838843
Self {
839844
fun: fun.into(),
840-
args,
841-
partition_by: Vec::default(),
842-
order_by: Vec::default(),
843-
window_frame: WindowFrame::new(None),
844-
null_treatment: None,
845+
params: WindowFunctionParams {
846+
args,
847+
partition_by: Vec::default(),
848+
order_by: Vec::default(),
849+
window_frame: WindowFrame::new(None),
850+
null_treatment: None,
851+
},
845852
}
846853
}
847854
}
@@ -1922,21 +1929,30 @@ impl NormalizeEq for Expr {
19221929
(
19231930
Expr::WindowFunction(WindowFunction {
19241931
fun: self_fun,
1925-
args: self_args,
1926-
partition_by: self_partition_by,
1927-
order_by: self_order_by,
1928-
window_frame: self_window_frame,
1929-
null_treatment: self_null_treatment,
1932+
params: self_params,
19301933
}),
19311934
Expr::WindowFunction(WindowFunction {
19321935
fun: other_fun,
1933-
args: other_args,
1934-
partition_by: other_partition_by,
1935-
order_by: other_order_by,
1936-
window_frame: other_window_frame,
1937-
null_treatment: other_null_treatment,
1936+
params: other_params,
19381937
}),
19391938
) => {
1939+
let (
1940+
WindowFunctionParams {
1941+
args: self_args,
1942+
window_frame: self_window_frame,
1943+
partition_by: self_partition_by,
1944+
order_by: self_order_by,
1945+
null_treatment: self_null_treatment,
1946+
},
1947+
WindowFunctionParams {
1948+
args: other_args,
1949+
window_frame: other_window_frame,
1950+
partition_by: other_partition_by,
1951+
order_by: other_order_by,
1952+
null_treatment: other_null_treatment,
1953+
},
1954+
) = (self_params, other_params);
1955+
19401956
self_fun.name() == other_fun.name()
19411957
&& self_window_frame == other_window_frame
19421958
&& self_null_treatment == other_null_treatment
@@ -2179,14 +2195,14 @@ impl HashNode for Expr {
21792195
distinct.hash(state);
21802196
null_treatment.hash(state);
21812197
}
2182-
Expr::WindowFunction(WindowFunction {
2183-
fun,
2184-
args: _args,
2185-
partition_by: _partition_by,
2186-
order_by: _order_by,
2187-
window_frame,
2188-
null_treatment,
2189-
}) => {
2198+
Expr::WindowFunction(WindowFunction { fun, params }) => {
2199+
let WindowFunctionParams {
2200+
args: _args,
2201+
partition_by: _,
2202+
order_by: _,
2203+
window_frame,
2204+
null_treatment,
2205+
} = params;
21902206
fun.hash(state);
21912207
window_frame.hash(state);
21922208
null_treatment.hash(state);
@@ -2467,39 +2483,52 @@ impl Display for SchemaDisplay<'_> {
24672483

24682484
Ok(())
24692485
}
2470-
Expr::WindowFunction(WindowFunction {
2471-
fun,
2472-
args,
2473-
partition_by,
2474-
order_by,
2475-
window_frame,
2476-
null_treatment,
2477-
}) => {
2478-
write!(
2479-
f,
2480-
"{}({})",
2481-
fun,
2482-
schema_name_from_exprs_comma_separated_without_space(args)?
2483-
)?;
2484-
2485-
if let Some(null_treatment) = null_treatment {
2486-
write!(f, " {}", null_treatment)?;
2486+
Expr::WindowFunction(WindowFunction { fun, params }) => match fun {
2487+
WindowFunctionDefinition::AggregateUDF(fun) => {
2488+
match fun.window_function_schema_name(params) {
2489+
Ok(name) => {
2490+
write!(f, "{name}")
2491+
}
2492+
Err(e) => {
2493+
write!(f, "got error from window_function_schema_name {}", e)
2494+
}
2495+
}
24872496
}
2497+
_ => {
2498+
let WindowFunctionParams {
2499+
args,
2500+
partition_by,
2501+
order_by,
2502+
window_frame,
2503+
null_treatment,
2504+
} = params;
24882505

2489-
if !partition_by.is_empty() {
24902506
write!(
24912507
f,
2492-
" PARTITION BY [{}]",
2493-
schema_name_from_exprs(partition_by)?
2508+
"{}({})",
2509+
fun,
2510+
schema_name_from_exprs_comma_separated_without_space(args)?
24942511
)?;
2495-
}
24962512

2497-
if !order_by.is_empty() {
2498-
write!(f, " ORDER BY [{}]", schema_name_from_sorts(order_by)?)?;
2499-
};
2513+
if let Some(null_treatment) = null_treatment {
2514+
write!(f, " {}", null_treatment)?;
2515+
}
25002516

2501-
write!(f, " {window_frame}")
2502-
}
2517+
if !partition_by.is_empty() {
2518+
write!(
2519+
f,
2520+
" PARTITION BY [{}]",
2521+
schema_name_from_exprs(partition_by)?
2522+
)?;
2523+
}
2524+
2525+
if !order_by.is_empty() {
2526+
write!(f, " ORDER BY [{}]", schema_name_from_sorts(order_by)?)?;
2527+
};
2528+
2529+
write!(f, " {window_frame}")
2530+
}
2531+
},
25032532
}
25042533
}
25052534
}
@@ -2621,33 +2650,47 @@ impl Display for Expr {
26212650
// Expr::ScalarFunction(ScalarFunction { func, args }) => {
26222651
// write!(f, "{}", func.display_name(args).unwrap())
26232652
// }
2624-
Expr::WindowFunction(WindowFunction {
2625-
fun,
2626-
args,
2627-
partition_by,
2628-
order_by,
2629-
window_frame,
2630-
null_treatment,
2631-
}) => {
2632-
fmt_function(f, &fun.to_string(), false, args, true)?;
2633-
2634-
if let Some(nt) = null_treatment {
2635-
write!(f, "{}", nt)?;
2653+
Expr::WindowFunction(WindowFunction { fun, params }) => match fun {
2654+
WindowFunctionDefinition::AggregateUDF(fun) => {
2655+
match fun.window_function_display_name(params) {
2656+
Ok(name) => {
2657+
write!(f, "{}", name)
2658+
}
2659+
Err(e) => {
2660+
write!(f, "got error from window_function_display_name {}", e)
2661+
}
2662+
}
26362663
}
2664+
WindowFunctionDefinition::WindowUDF(fun) => {
2665+
let WindowFunctionParams {
2666+
args,
2667+
partition_by,
2668+
order_by,
2669+
window_frame,
2670+
null_treatment,
2671+
} = params;
26372672

2638-
if !partition_by.is_empty() {
2639-
write!(f, " PARTITION BY [{}]", expr_vec_fmt!(partition_by))?;
2640-
}
2641-
if !order_by.is_empty() {
2642-
write!(f, " ORDER BY [{}]", expr_vec_fmt!(order_by))?;
2673+
fmt_function(f, &fun.to_string(), false, args, true)?;
2674+
2675+
if let Some(nt) = null_treatment {
2676+
write!(f, "{}", nt)?;
2677+
}
2678+
2679+
if !partition_by.is_empty() {
2680+
write!(f, " PARTITION BY [{}]", expr_vec_fmt!(partition_by))?;
2681+
}
2682+
if !order_by.is_empty() {
2683+
write!(f, " ORDER BY [{}]", expr_vec_fmt!(order_by))?;
2684+
}
2685+
write!(
2686+
f,
2687+
" {} BETWEEN {} AND {}",
2688+
window_frame.units,
2689+
window_frame.start_bound,
2690+
window_frame.end_bound
2691+
)
26432692
}
2644-
write!(
2645-
f,
2646-
" {} BETWEEN {} AND {}",
2647-
window_frame.units, window_frame.start_bound, window_frame.end_bound
2648-
)?;
2649-
Ok(())
2650-
}
2693+
},
26512694
Expr::AggregateFunction(AggregateFunction { func, params }) => {
26522695
match func.display_name(params) {
26532696
Ok(name) => {

datafusion/expr/src/expr_fn.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use crate::expr::{
2121
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
22-
Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction,
22+
Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction, WindowFunctionParams,
2323
};
2424
use crate::function::{
2525
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
@@ -832,14 +832,22 @@ impl ExprFuncBuilder {
832832
udaf.params.null_treatment = null_treatment;
833833
Expr::AggregateFunction(udaf)
834834
}
835-
ExprFuncKind::Window(mut udwf) => {
835+
ExprFuncKind::Window(WindowFunction {
836+
fun,
837+
params: WindowFunctionParams { args, .. },
838+
}) => {
836839
let has_order_by = order_by.as_ref().map(|o| !o.is_empty());
837-
udwf.order_by = order_by.unwrap_or_default();
838-
udwf.partition_by = partition_by.unwrap_or_default();
839-
udwf.window_frame =
840-
window_frame.unwrap_or(WindowFrame::new(has_order_by));
841-
udwf.null_treatment = null_treatment;
842-
Expr::WindowFunction(udwf)
840+
Expr::WindowFunction(WindowFunction {
841+
fun,
842+
params: WindowFunctionParams {
843+
args,
844+
partition_by: partition_by.unwrap_or_default(),
845+
order_by: order_by.unwrap_or_default(),
846+
window_frame: window_frame
847+
.unwrap_or(WindowFrame::new(has_order_by)),
848+
null_treatment,
849+
},
850+
})
843851
}
844852
};
845853

datafusion/expr/src/expr_schema.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use super::{Between, Expr, Like};
1919
use crate::expr::{
2020
AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
2121
InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
22+
WindowFunctionParams,
2223
};
2324
use crate::type_coercion::functions::{
2425
data_types_with_aggregate_udf, data_types_with_scalar_udf, data_types_with_window_udf,
@@ -510,7 +511,11 @@ impl Expr {
510511
schema: &dyn ExprSchema,
511512
window_function: &WindowFunction,
512513
) -> Result<(DataType, bool)> {
513-
let WindowFunction { fun, args, .. } = window_function;
514+
let WindowFunction {
515+
fun,
516+
params: WindowFunctionParams { args, .. },
517+
..
518+
} = window_function;
514519

515520
let data_types = args
516521
.iter()

0 commit comments

Comments
 (0)