Skip to content

Commit 79f67b8

Browse files
authored
feat: implement partition_statistics for WindowAggExec (#18534)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Ref #15873 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Implemented partition_statistics for WindowAggExec. ## Are these changes tested? Check the unit tests. ## Are there any user-facing changes? No <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 27de50d commit 79f67b8

File tree

2 files changed

+118
-23
lines changed

2 files changed

+118
-23
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod test {
3030
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
3131
use datafusion_execution::TaskContext;
3232
use datafusion_execution::config::SessionConfig;
33+
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
3334
use datafusion_expr_common::operator::Operator;
3435
use datafusion_functions_aggregate::count::count_udaf;
3536
use datafusion_physical_expr::Partitioning;
@@ -52,6 +53,7 @@ mod test {
5253
use datafusion_physical_plan::repartition::RepartitionExec;
5354
use datafusion_physical_plan::sorts::sort::SortExec;
5455
use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
56+
use datafusion_physical_plan::windows::{WindowAggExec, create_window_expr};
5557
use datafusion_physical_plan::{
5658
ExecutionPlan, ExecutionPlanProperties, execute_stream_partitioned,
5759
get_plan_string,
@@ -1154,4 +1156,105 @@ mod test {
11541156

11551157
Ok(())
11561158
}
1159+
1160+
#[tokio::test]
1161+
async fn test_statistic_by_partition_of_window_agg() -> Result<()> {
1162+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
1163+
1164+
let window_expr = create_window_expr(
1165+
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
1166+
"count".to_owned(),
1167+
&[col("id", &scan.schema())?],
1168+
&[], // no partition by
1169+
&[PhysicalSortExpr::new(
1170+
col("id", &scan.schema())?,
1171+
SortOptions::default(),
1172+
)],
1173+
Arc::new(WindowFrame::new(Some(false))),
1174+
scan.schema(),
1175+
false,
1176+
false,
1177+
None,
1178+
)?;
1179+
1180+
let window_agg: Arc<dyn ExecutionPlan> =
1181+
Arc::new(WindowAggExec::try_new(vec![window_expr], scan, true)?);
1182+
1183+
// Verify partition statistics are properly propagated (not unknown)
1184+
let statistics = (0..window_agg.output_partitioning().partition_count())
1185+
.map(|idx| window_agg.partition_statistics(Some(idx)))
1186+
.collect::<Result<Vec<_>>>()?;
1187+
1188+
assert_eq!(statistics.len(), 2);
1189+
1190+
// Window functions preserve input row counts and column statistics
1191+
// but add unknown statistics for the new window column
1192+
let expected_statistic_partition_1 = Statistics {
1193+
num_rows: Precision::Exact(2),
1194+
total_byte_size: Precision::Absent,
1195+
column_statistics: vec![
1196+
ColumnStatistics {
1197+
null_count: Precision::Exact(0),
1198+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1199+
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
1200+
sum_value: Precision::Absent,
1201+
distinct_count: Precision::Absent,
1202+
byte_size: Precision::Exact(8),
1203+
},
1204+
ColumnStatistics {
1205+
null_count: Precision::Exact(0),
1206+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1207+
DATE_2025_03_02,
1208+
))),
1209+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1210+
DATE_2025_03_01,
1211+
))),
1212+
sum_value: Precision::Absent,
1213+
distinct_count: Precision::Absent,
1214+
byte_size: Precision::Exact(8),
1215+
},
1216+
ColumnStatistics::new_unknown(), // window column
1217+
],
1218+
};
1219+
1220+
let expected_statistic_partition_2 = Statistics {
1221+
num_rows: Precision::Exact(2),
1222+
total_byte_size: Precision::Absent,
1223+
column_statistics: vec![
1224+
ColumnStatistics {
1225+
null_count: Precision::Exact(0),
1226+
max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
1227+
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1228+
sum_value: Precision::Absent,
1229+
distinct_count: Precision::Absent,
1230+
byte_size: Precision::Exact(8),
1231+
},
1232+
ColumnStatistics {
1233+
null_count: Precision::Exact(0),
1234+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1235+
DATE_2025_03_04,
1236+
))),
1237+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1238+
DATE_2025_03_03,
1239+
))),
1240+
sum_value: Precision::Absent,
1241+
distinct_count: Precision::Absent,
1242+
byte_size: Precision::Exact(8),
1243+
},
1244+
ColumnStatistics::new_unknown(), // window column
1245+
],
1246+
};
1247+
1248+
assert_eq!(statistics[0], expected_statistic_partition_1);
1249+
assert_eq!(statistics[1], expected_statistic_partition_2);
1250+
1251+
// Verify the statistics match actual execution results
1252+
let expected_stats = vec![
1253+
ExpectedStatistics::NonEmpty(3, 4, 2),
1254+
ExpectedStatistics::NonEmpty(1, 2, 2),
1255+
];
1256+
validate_statistics_with_data(window_agg, expected_stats, 0).await?;
1257+
1258+
Ok(())
1259+
}
11571260
}

datafusion/physical-plan/src/windows/window_agg_exec.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -158,24 +158,6 @@ impl WindowAggExec {
158158
.unwrap_or_else(Vec::new)
159159
}
160160
}
161-
162-
fn statistics_inner(&self) -> Result<Statistics> {
163-
let input_stat = self.input.partition_statistics(None)?;
164-
let win_cols = self.window_expr.len();
165-
let input_cols = self.input.schema().fields().len();
166-
// TODO stats: some windowing function will maintain invariants such as min, max...
167-
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
168-
// copy stats of the input to the beginning of the schema.
169-
column_statistics.extend(input_stat.column_statistics);
170-
for _ in 0..win_cols {
171-
column_statistics.push(ColumnStatistics::new_unknown())
172-
}
173-
Ok(Statistics {
174-
num_rows: input_stat.num_rows,
175-
column_statistics,
176-
total_byte_size: Precision::Absent,
177-
})
178-
}
179161
}
180162

181163
impl DisplayAs for WindowAggExec {
@@ -291,15 +273,25 @@ impl ExecutionPlan for WindowAggExec {
291273
}
292274

293275
fn statistics(&self) -> Result<Statistics> {
294-
self.statistics_inner()
276+
self.partition_statistics(None)
295277
}
296278

297279
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
298-
if partition.is_none() {
299-
self.statistics_inner()
300-
} else {
301-
Ok(Statistics::new_unknown(&self.schema()))
280+
let input_stat = self.input.partition_statistics(partition)?;
281+
let win_cols = self.window_expr.len();
282+
let input_cols = self.input.schema().fields().len();
283+
// TODO stats: some windowing function will maintain invariants such as min, max...
284+
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
285+
// copy stats of the input to the beginning of the schema.
286+
column_statistics.extend(input_stat.column_statistics);
287+
for _ in 0..win_cols {
288+
column_statistics.push(ColumnStatistics::new_unknown())
302289
}
290+
Ok(Statistics {
291+
num_rows: input_stat.num_rows,
292+
column_statistics,
293+
total_byte_size: Precision::Absent,
294+
})
303295
}
304296
}
305297

0 commit comments

Comments
 (0)