Skip to content

Commit c6d5520

Browse files
authored
feat: support distinct for window (#16925)
* feat: support distinct for window * fix * fix * fisx * fix unparse * fix test * fix test * easy way * add test * add comments
1 parent 9f3e7dc commit c6d5520

File tree

22 files changed

+407
-42
lines changed

22 files changed

+407
-42
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
199199
order_by: window_function.params.order_by,
200200
window_frame: window_function.params.window_frame,
201201
null_treatment: window_function.params.null_treatment,
202+
distinct: window_function.params.distinct,
202203
},
203204
}))
204205
};

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ pub fn create_window_expr_with_name(
16491649
order_by,
16501650
window_frame,
16511651
null_treatment,
1652+
distinct,
16521653
},
16531654
} = window_fun.as_ref();
16541655
let physical_args =
@@ -1677,6 +1678,7 @@ pub fn create_window_expr_with_name(
16771678
window_frame,
16781679
physical_schema,
16791680
ignore_nulls,
1681+
*distinct,
16801682
)
16811683
}
16821684
other => plan_err!("Invalid window expression '{other:?}'"),

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
288288
Arc::new(window_frame),
289289
&extended_schema,
290290
false,
291+
false,
291292
)?;
292293
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
293294
vec![window_expr],
@@ -660,6 +661,7 @@ async fn run_window_test(
660661
Arc::new(window_frame.clone()),
661662
&extended_schema,
662663
false,
664+
false,
663665
)?],
664666
exec1,
665667
false,
@@ -678,6 +680,7 @@ async fn run_window_test(
678680
Arc::new(window_frame.clone()),
679681
&extended_schema,
680682
false,
683+
false,
681684
)?],
682685
exec2,
683686
search_mode.clone(),

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3685,6 +3685,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
36853685
case.window_frame,
36863686
input_schema.as_ref(),
36873687
false,
3688+
false,
36883689
)?;
36893690
let window_exec = if window_expr.uses_bounded_memory() {
36903691
Arc::new(BoundedWindowAggExec::try_new(

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ pub fn bounded_window_exec_with_partition(
265265
Arc::new(WindowFrame::new(Some(false))),
266266
schema.as_ref(),
267267
false,
268+
false,
268269
)
269270
.unwrap();
270271

datafusion/expr/src/expr.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,8 @@ pub struct WindowFunctionParams {
11311131
pub window_frame: WindowFrame,
11321132
/// Specifies how NULL value is treated: ignore or respect
11331133
pub null_treatment: Option<NullTreatment>,
1134+
/// Distinct flag
1135+
pub distinct: bool,
11341136
}
11351137

11361138
impl WindowFunction {
@@ -1145,6 +1147,7 @@ impl WindowFunction {
11451147
order_by: Vec::default(),
11461148
window_frame: WindowFrame::new(None),
11471149
null_treatment: None,
1150+
distinct: false,
11481151
},
11491152
}
11501153
}
@@ -2291,6 +2294,7 @@ impl NormalizeEq for Expr {
22912294
partition_by: self_partition_by,
22922295
order_by: self_order_by,
22932296
null_treatment: self_null_treatment,
2297+
distinct: self_distinct,
22942298
},
22952299
} = left.as_ref();
22962300
let WindowFunction {
@@ -2302,6 +2306,7 @@ impl NormalizeEq for Expr {
23022306
partition_by: other_partition_by,
23032307
order_by: other_order_by,
23042308
null_treatment: other_null_treatment,
2309+
distinct: other_distinct,
23052310
},
23062311
} = other.as_ref();
23072312

@@ -2325,6 +2330,7 @@ impl NormalizeEq for Expr {
23252330
&& a.nulls_first == b.nulls_first
23262331
&& a.expr.normalize_eq(&b.expr)
23272332
})
2333+
&& self_distinct == other_distinct
23282334
}
23292335
(
23302336
Expr::Exists(Exists {
@@ -2558,11 +2564,13 @@ impl HashNode for Expr {
25582564
order_by: _,
25592565
window_frame,
25602566
null_treatment,
2567+
distinct,
25612568
},
25622569
} = window_fun.as_ref();
25632570
fun.hash(state);
25642571
window_frame.hash(state);
25652572
null_treatment.hash(state);
2573+
distinct.hash(state);
25662574
}
25672575
Expr::InList(InList {
25682576
expr: _expr,
@@ -2865,15 +2873,27 @@ impl Display for SchemaDisplay<'_> {
28652873
order_by,
28662874
window_frame,
28672875
null_treatment,
2876+
distinct,
28682877
} = params;
28692878

2879+
// Write function name and open parenthesis
2880+
write!(f, "{fun}(")?;
2881+
2882+
// If DISTINCT, emit the keyword
2883+
if *distinct {
2884+
write!(f, "DISTINCT ")?;
2885+
}
2886+
2887+
// Write the comma‑separated argument list
28702888
write!(
28712889
f,
2872-
"{}({})",
2873-
fun,
2890+
"{}",
28742891
schema_name_from_exprs_comma_separated_without_space(args)?
28752892
)?;
28762893

2894+
// **Close the argument parenthesis**
2895+
write!(f, ")")?;
2896+
28772897
if let Some(null_treatment) = null_treatment {
28782898
write!(f, " {null_treatment}")?;
28792899
}
@@ -3260,9 +3280,10 @@ impl Display for Expr {
32603280
order_by,
32613281
window_frame,
32623282
null_treatment,
3283+
distinct,
32633284
} = params;
32643285

3265-
fmt_function(f, &fun.to_string(), false, args, true)?;
3286+
fmt_function(f, &fun.to_string(), *distinct, args, true)?;
32663287

32673288
if let Some(nt) = null_treatment {
32683289
write!(f, "{nt}")?;

datafusion/expr/src/expr_fn.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,7 @@ impl ExprFuncBuilder {
946946
window_frame: window_frame
947947
.unwrap_or_else(|| WindowFrame::new(has_order_by)),
948948
null_treatment,
949+
distinct,
949950
},
950951
})
951952
}

datafusion/expr/src/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ pub struct RawWindowExpr {
308308
pub order_by: Vec<SortExpr>,
309309
pub window_frame: WindowFrame,
310310
pub null_treatment: Option<NullTreatment>,
311+
pub distinct: bool,
311312
}
312313

313314
/// Result of planning a raw expr with [`ExprPlanner`]

datafusion/expr/src/tree_node.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,22 @@ impl TreeNode for Expr {
242242
order_by,
243243
window_frame,
244244
null_treatment,
245+
distinct,
245246
},
246247
} = *window_fun;
247248
(args, partition_by, order_by).map_elements(f)?.update_data(
248249
|(new_args, new_partition_by, new_order_by)| {
250+
if distinct {
251+
return Expr::from(WindowFunction::new(fun, new_args))
252+
.partition_by(new_partition_by)
253+
.order_by(new_order_by)
254+
.window_frame(window_frame)
255+
.null_treatment(null_treatment)
256+
.distinct()
257+
.build()
258+
.unwrap();
259+
}
260+
249261
Expr::from(WindowFunction::new(fun, new_args))
250262
.partition_by(new_partition_by)
251263
.order_by(new_order_by)

datafusion/expr/src/udaf.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,25 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
554554
order_by,
555555
window_frame,
556556
null_treatment,
557+
distinct,
557558
} = params;
558559

559560
let mut schema_name = String::new();
560-
schema_name.write_fmt(format_args!(
561-
"{}({})",
562-
self.name(),
563-
schema_name_from_exprs(args)?
564-
))?;
561+
562+
// Inject DISTINCT into the schema name when requested
563+
if *distinct {
564+
schema_name.write_fmt(format_args!(
565+
"{}(DISTINCT {})",
566+
self.name(),
567+
schema_name_from_exprs(args)?
568+
))?;
569+
} else {
570+
schema_name.write_fmt(format_args!(
571+
"{}({})",
572+
self.name(),
573+
schema_name_from_exprs(args)?
574+
))?;
575+
}
565576

566577
if let Some(null_treatment) = null_treatment {
567578
schema_name.write_fmt(format_args!(" {null_treatment}"))?;
@@ -579,7 +590,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
579590
" ORDER BY [{}]",
580591
schema_name_from_sorts(order_by)?
581592
))?;
582-
};
593+
}
583594

584595
schema_name.write_fmt(format_args!(" {window_frame}"))?;
585596

@@ -648,15 +659,24 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
648659
order_by,
649660
window_frame,
650661
null_treatment,
662+
distinct,
651663
} = params;
652664

653665
let mut display_name = String::new();
654666

655-
display_name.write_fmt(format_args!(
656-
"{}({})",
657-
self.name(),
658-
expr_vec_fmt!(args)
659-
))?;
667+
if *distinct {
668+
display_name.write_fmt(format_args!(
669+
"{}(DISTINCT {})",
670+
self.name(),
671+
expr_vec_fmt!(args)
672+
))?;
673+
} else {
674+
display_name.write_fmt(format_args!(
675+
"{}({})",
676+
self.name(),
677+
expr_vec_fmt!(args)
678+
))?;
679+
}
660680

661681
if let Some(null_treatment) = null_treatment {
662682
display_name.write_fmt(format_args!(" {null_treatment}"))?;

0 commit comments

Comments
 (0)