Skip to content

Commit 959438b

Browse files
getChanalamb
authored andcommitted
Implement cardinality_effect for window execs and UnionExec (apache#20321)
## Which issue does this PR close? - Closes apache#20291. ## Rationale for this change `WindowAggExec` and `BoundedWindowAggExec` did not implement `cardinality_effect`, which left this property as `Unknown`. Both operators preserve row cardinality: - They evaluate window expressions per input row and append result columns. - They do not filter out rows. - They do not duplicate rows. So their cardinality effect is `Equal`. This PR also updates `UnionExec`, which combines rows from multiple children. Its cardinality effect should be `GreaterEqual` instead of defaulting to `Unknown`. ## What changes are included in this PR? - Implement `cardinality_effect` for `WindowAggExec` as `CardinalityEffect::Equal`. - Implement `cardinality_effect` for `BoundedWindowAggExec` as `CardinalityEffect::Equal`. - Implement `cardinality_effect` for `UnionExec` as `CardinalityEffect::GreaterEqual`. ## Are these changes tested? Unit tested. ## Are there any user-facing changes? No. ## Additional note I used a coding agent for implementation/PR drafting and reviewed the changes myself. If this conflicts with project policy, please let me know.
1 parent 7698fdc commit 959438b

File tree

3 files changed

+102
-3
lines changed

3 files changed

+102
-3
lines changed

datafusion/physical-plan/src/union.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use super::{
3434
};
3535
use crate::check_if_same_properties;
3636
use crate::execution_plan::{
37-
InvariantLevel, boundedness_from_children, check_default_invariants,
38-
emission_type_from_children,
37+
CardinalityEffect, InvariantLevel, boundedness_from_children,
38+
check_default_invariants, emission_type_from_children,
3939
};
4040
use crate::filter::FilterExec;
4141
use crate::filter_pushdown::{
@@ -352,6 +352,12 @@ impl ExecutionPlan for UnionExec {
352352
}
353353
}
354354

355+
fn cardinality_effect(&self) -> CardinalityEffect {
356+
// Union combines rows from multiple inputs, so output rows are not tied
357+
// to any single input and can only be constrained as greater-or-equal.
358+
CardinalityEffect::GreaterEqual
359+
}
360+
355361
fn supports_limit_pushdown(&self) -> bool {
356362
true
357363
}
@@ -1195,4 +1201,25 @@ mod tests {
11951201
)
11961202
);
11971203
}
1204+
1205+
#[test]
1206+
fn test_union_cardinality_effect() -> Result<()> {
1207+
let schema = create_test_schema()?;
1208+
let input1: Arc<dyn ExecutionPlan> =
1209+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1210+
let input2: Arc<dyn ExecutionPlan> =
1211+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1212+
1213+
let union = UnionExec::try_new(vec![input1, input2])?;
1214+
let union = union
1215+
.as_any()
1216+
.downcast_ref::<UnionExec>()
1217+
.expect("expected UnionExec for multiple inputs");
1218+
1219+
assert!(matches!(
1220+
union.cardinality_effect(),
1221+
CardinalityEffect::GreaterEqual
1222+
));
1223+
Ok(())
1224+
}
11981225
}

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use datafusion_physical_expr_common::sort_expr::{
6565
OrderingRequirements, PhysicalSortExpr,
6666
};
6767

68+
use crate::execution_plan::CardinalityEffect;
6869
use ahash::RandomState;
6970
use futures::stream::Stream;
7071
use futures::{StreamExt, ready};
@@ -384,6 +385,10 @@ impl ExecutionPlan for BoundedWindowAggExec {
384385
let input_stat = self.input.partition_statistics(partition)?;
385386
self.statistics_helper(input_stat)
386387
}
388+
389+
fn cardinality_effect(&self) -> CardinalityEffect {
390+
CardinalityEffect::Equal
391+
}
387392
}
388393

389394
/// Trait that specifies how we search for (or calculate) partitions. It has two
@@ -1252,6 +1257,7 @@ mod tests {
12521257
use std::time::Duration;
12531258

12541259
use crate::common::collect;
1260+
use crate::execution_plan::CardinalityEffect;
12551261
use crate::expressions::PhysicalSortExpr;
12561262
use crate::projection::{ProjectionExec, ProjectionExpr};
12571263
use crate::streaming::{PartitionStream, StreamingTableExec};
@@ -1836,4 +1842,22 @@ mod tests {
18361842

18371843
Ok(())
18381844
}
1845+
1846+
#[test]
1847+
fn test_bounded_window_agg_cardinality_effect() -> Result<()> {
1848+
let schema = test_schema();
1849+
let input: Arc<dyn ExecutionPlan> =
1850+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1851+
let plan = bounded_window_exec_pb_latent_range(input, 1, "hash", "sn")?;
1852+
let plan = plan
1853+
.as_any()
1854+
.downcast_ref::<BoundedWindowAggExec>()
1855+
.expect("expected BoundedWindowAggExec");
1856+
1857+
assert!(matches!(
1858+
plan.cardinality_effect(),
1859+
CardinalityEffect::Equal
1860+
));
1861+
Ok(())
1862+
}
18391863
}

datafusion/physical-plan/src/windows/window_agg_exec.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424

2525
use super::utils::create_schema;
26-
use crate::execution_plan::EmissionType;
26+
use crate::execution_plan::{CardinalityEffect, EmissionType};
2727
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2828
use crate::windows::{
2929
calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
@@ -301,6 +301,10 @@ impl ExecutionPlan for WindowAggExec {
301301
total_byte_size: Precision::Absent,
302302
})
303303
}
304+
305+
fn cardinality_effect(&self) -> CardinalityEffect {
306+
CardinalityEffect::Equal
307+
}
304308
}
305309

306310
/// Compute the window aggregate columns
@@ -450,3 +454,47 @@ impl RecordBatchStream for WindowAggStream {
450454
Arc::clone(&self.schema)
451455
}
452456
}
457+
458+
#[cfg(test)]
459+
mod tests {
460+
use super::*;
461+
use crate::test::TestMemoryExec;
462+
use crate::windows::create_window_expr;
463+
use arrow::datatypes::{DataType, Field, Schema};
464+
use datafusion_common::ScalarValue;
465+
use datafusion_expr::{
466+
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
467+
};
468+
use datafusion_functions_aggregate::count::count_udaf;
469+
470+
#[test]
471+
fn test_window_agg_cardinality_effect() -> Result<()> {
472+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
473+
let input: Arc<dyn ExecutionPlan> =
474+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
475+
let args = vec![crate::expressions::col("a", &schema)?];
476+
let window_expr = create_window_expr(
477+
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
478+
"count(a)".to_string(),
479+
&args,
480+
&[],
481+
&[],
482+
Arc::new(WindowFrame::new_bounds(
483+
WindowFrameUnits::Rows,
484+
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
485+
WindowFrameBound::CurrentRow,
486+
)),
487+
Arc::clone(&schema),
488+
false,
489+
false,
490+
None,
491+
)?;
492+
493+
let window = WindowAggExec::try_new(vec![window_expr], input, true)?;
494+
assert!(matches!(
495+
window.cardinality_effect(),
496+
CardinalityEffect::Equal
497+
));
498+
Ok(())
499+
}
500+
}

0 commit comments

Comments
 (0)