From d1cc5c67d0f536fc5d2ada29df1af09021e31107 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Tue, 8 Jul 2025 14:19:24 -0700 Subject: [PATCH] fix(cubestore): Avoid bug in topk planning when projection column order is permuted --- .../cubestore/src/queryplanner/topk/plan.rs | 75 ++++++++++++++++++- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs b/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs index ccedf71b8228e..5af391cc7daad 100644 --- a/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs +++ b/rust/cubestore/cubestore/src/queryplanner/topk/plan.rs @@ -95,8 +95,7 @@ pub fn materialize_topk(p: LogicalPlan) -> Result let out_name = out_schema.field(out_i).name(); //let mut e = Expr::Column(f.qualified_column()); - let mut e = - p.post_projection[p.input_columns[out_i]].clone(); + let mut e = p.post_projection[out_i].clone(); if out_name != in_field.name() { e = Expr::Alias(Box::new(e), out_name.clone()) } @@ -420,3 +419,75 @@ fn make_sort_expr( _ => col, } } + +#[cfg(test)] +mod tests { + use datafusion::{ + arrow::datatypes::Field, + logical_plan::{col, sum, LogicalPlanBuilder}, + }; + + use super::*; + + #[test] + fn topk_projection_column_switched() { + // A regression test for materialize_topk switching around projection expressions when their + // order does not match the aggregate node's aggregation expression order. (Also, when + // materialize_topk had this bug, the Projection node's DFSchema was left unchanged, making + // it inconsistent with the expressions.) + + let table_schema = Schema::new(vec![ + Field::new("group_field", DataType::Int64, true), + Field::new("agg_sortby", DataType::Int64, true), + Field::new("agg_1", DataType::Int64, true), + Field::new("agg_2", DataType::Int64, true), + ]); + + let scan_node = LogicalPlanBuilder::scan_empty(Some("table"), &table_schema, None) + .unwrap() + .build() + .unwrap(); + + let cluster_send = + ClusterSendNode::new(Arc::new(scan_node), vec![vec![]], None).into_plan(); + + let plan = LogicalPlanBuilder::from(cluster_send) + .aggregate( + vec![col("group_field")], + vec![sum(col("agg_sortby")), sum(col("agg_1")), sum(col("agg_2"))], + ) + .unwrap() + .project(vec![ + col("group_field"), + col("SUM(table.agg_sortby)"), + col("SUM(table.agg_2)"), + col("SUM(table.agg_1)"), + ]) + .expect("project to be valid") + .sort(vec![col("SUM(table.agg_sortby)").sort(false, false)]) + .unwrap() + .limit(10) + .unwrap() + .build() + .unwrap(); + + let before_schema = plan.schema().clone(); + + let plan = materialize_topk(plan).expect("materialize_topk to succeed"); + + let after_schema = plan.schema().clone(); + + // Of course the schema shouldn't change. + assert_eq!(before_schema, after_schema); + + // We are testing that topk materialization doesn't switch the field order (of + // SUM(table.agg_2) and SUM(table.agg_1)) in the projection above it. + let expected = "\ + Projection: #table.group_field, #SUM(table.agg_sortby), #SUM(table.agg_2), #SUM(table.agg_1)\ + \n ClusterAggregateTopK, limit = 10, groupBy = [#table.group_field], aggr = [SUM(#table.agg_sortby), SUM(#table.agg_1), SUM(#table.agg_2)], sortBy = [SortColumn { agg_index: 0, asc: false, nulls_first: false }]\ + \n TableScan: table projection=None"; + let formatted = format!("{:?}", plan); + + assert_eq!(expected, formatted); + } +}