Skip to content

Commit d2ae1c2

Browse files
committed
chore(cubestore): Upgrade DF: Fix suboptimal query plan detection
1 parent 70e999a commit d2ae1c2

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

rust/cubestore/cubestore/src/queryplanner/physical_plan_flags.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use datafusion::logical_expr::{Operator, UserDefinedLogicalNode};
1+
use datafusion::logical_expr::Operator;
22
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
3+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
34
use datafusion::physical_plan::expressions::{BinaryExpr, CastExpr, Column, Literal, TryCastExpr};
45
use datafusion::physical_plan::filter::FilterExec;
5-
use datafusion::physical_plan::repartition::RepartitionExec;
6-
use datafusion::physical_plan::union::UnionExec;
6+
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
77
use datafusion::physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr};
88
use serde::Serialize;
99
use serde_json::{json, Value};
@@ -37,8 +37,9 @@ impl PhysicalPlanFlags {
3737
fn physical_plan_flags_fill(p: &dyn ExecutionPlan, flags: &mut PhysicalPlanFlags) {
3838
let a = p.as_any();
3939
if let Some(agg) = a.downcast_ref::<AggregateExec>() {
40-
let is_final_hash_agg_without_groups =
41-
agg.mode() == &AggregateMode::Final && agg.group_expr().expr().len() == 0;
40+
let is_final_hash_agg_without_groups = agg.mode() == &AggregateMode::Final
41+
&& agg.input_order_mode() == &InputOrderMode::Linear
42+
&& agg.group_expr().expr().len() == 0;
4243

4344
let is_full_inplace_agg = agg.mode() == &AggregateMode::Single
4445
&& agg.input_order_mode() == &InputOrderMode::Sorted;
@@ -63,19 +64,21 @@ impl PhysicalPlanFlags {
6364
let predicate = f.predicate();
6465
let predicate_column_groups = extract_columns_with_operators(predicate.as_ref());
6566
let input = f.input();
67+
let input_as_any = input.as_any();
6668

67-
let maybe_input_exec = input
68-
.as_any()
69-
.downcast_ref::<RepartitionExec>()
69+
let maybe_input_exec = input_as_any
70+
.downcast_ref::<CoalescePartitionsExec>()
7071
.map(|exec| exec.input().as_any())
7172
.or_else(|| {
7273
input
7374
.as_any()
74-
.downcast_ref::<RepartitionExec>()
75+
.downcast_ref::<SortPreservingMergeExec>()
7576
.map(|exec| exec.input().as_any())
7677
});
7778

78-
if let Some(input_exec_any) = maybe_input_exec {
79+
// Left "if true" in DF upgrade branch to keep indentation and reduce conflicts.
80+
if true {
81+
let input_exec_any = maybe_input_exec.unwrap_or(input_as_any);
7982
if let Some(cte) = input_exec_any.downcast_ref::<CubeTableExec>() {
8083
let sort_key_size = cte.index_snapshot.index.row.sort_key_size() as usize;
8184
let index_columns =

0 commit comments

Comments
 (0)