Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions rust/cubestore/cubestore/src/queryplanner/topk/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ pub fn materialize_topk(p: LogicalPlan) -> Result<LogicalPlan, DataFusionError>
let out_name = out_schema.field(out_i).name();

//let mut e = Expr::Column(f.qualified_column());
let mut e =
p.post_projection[p.input_columns[out_i]].clone();
let mut e = p.post_projection[out_i].clone();
if out_name != in_field.name() {
e = Expr::Alias(Box::new(e), out_name.clone())
}
Expand Down Expand Up @@ -420,3 +419,75 @@ fn make_sort_expr(
_ => col,
}
}

#[cfg(test)]
mod tests {
use datafusion::{
arrow::datatypes::Field,
logical_plan::{col, sum, LogicalPlanBuilder},
};

use super::*;

#[test]
fn topk_projection_column_switched() {
// A regression test for materialize_topk switching around projection expressions when their
// order does not match the aggregate node's aggregation expression order. (Also, when
// materialize_topk had this bug, the Projection node's DFSchema was left unchanged, making
// it inconsistent with the expressions.)

let table_schema = Schema::new(vec![
Field::new("group_field", DataType::Int64, true),
Field::new("agg_sortby", DataType::Int64, true),
Field::new("agg_1", DataType::Int64, true),
Field::new("agg_2", DataType::Int64, true),
]);

let scan_node = LogicalPlanBuilder::scan_empty(Some("table"), &table_schema, None)
.unwrap()
.build()
.unwrap();

let cluster_send =
ClusterSendNode::new(Arc::new(scan_node), vec![vec![]], None).into_plan();

let plan = LogicalPlanBuilder::from(cluster_send)
.aggregate(
vec![col("group_field")],
vec![sum(col("agg_sortby")), sum(col("agg_1")), sum(col("agg_2"))],
)
.unwrap()
.project(vec![
col("group_field"),
col("SUM(table.agg_sortby)"),
col("SUM(table.agg_2)"),
col("SUM(table.agg_1)"),
])
.expect("project to be valid")
.sort(vec![col("SUM(table.agg_sortby)").sort(false, false)])
.unwrap()
.limit(10)
.unwrap()
.build()
.unwrap();

let before_schema = plan.schema().clone();

let plan = materialize_topk(plan).expect("materialize_topk to succeed");

let after_schema = plan.schema().clone();

// Of course the schema shouldn't change.
assert_eq!(before_schema, after_schema);

// We are testing that topk materialization doesn't switch the field order (of
// SUM(table.agg_2) and SUM(table.agg_1)) in the projection above it.
let expected = "\
Projection: #table.group_field, #SUM(table.agg_sortby), #SUM(table.agg_2), #SUM(table.agg_1)\
\n ClusterAggregateTopK, limit = 10, groupBy = [#table.group_field], aggr = [SUM(#table.agg_sortby), SUM(#table.agg_1), SUM(#table.agg_2)], sortBy = [SortColumn { agg_index: 0, asc: false, nulls_first: false }]\
\n TableScan: table projection=None";
let formatted = format!("{:?}", plan);

assert_eq!(expected, formatted);
}
}
Loading