Skip to content

Commit 024709c

Browse files
committed
WIP: try_switch_to_inplace_aggregates now coarsely supporting inplace partial aggregates
1 parent cd82aff commit 024709c

File tree

2 files changed

+42
-17
lines changed

2 files changed

+42
-17
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/prefer_inplace_aggregates.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use datafusion::physical_plan::filter::FilterExec;
66
use datafusion::physical_plan::hash_aggregate::{AggregateStrategy, HashAggregateExec};
77
use datafusion::physical_plan::merge::MergeExec;
88
use datafusion::physical_plan::merge_sort::MergeSortExec;
9-
use datafusion::physical_plan::planner::input_sortedness_by_group_key;
9+
use datafusion::physical_plan::planner::{input_sortedness_by_group_key, input_sortedness_by_group_key_using_approximate};
1010
use datafusion::physical_plan::projection::ProjectionExec;
1111
use datafusion::physical_plan::union::UnionExec;
1212
use datafusion::physical_plan::ExecutionPlan;
@@ -29,7 +29,10 @@ pub fn try_switch_to_inplace_aggregates(
2929
// Try to cheaply rearrange the plan so that it produces sorted inputs.
3030
let new_input = try_regroup_columns(agg.input().clone())?;
3131

32-
let input_sortedness = input_sortedness_by_group_key(new_input.as_ref(), agg.group_expr());
32+
let input_sortedness = input_sortedness_by_group_key_using_approximate(new_input.as_ref(), agg.group_expr());
33+
34+
log::error!("try_switch_to_inplace_aggregates: input_sortedness is {:?}\n and... output_hints is {:?}\n and... new_input is {}",
35+
input_sortedness, new_input.output_hints(), crate::queryplanner::pretty_printers::pp_phys_plan(new_input.as_ref()));
3336

3437

3538
let (strategy, order): (AggregateStrategy, Option<Vec<usize>>) =

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -838,35 +838,57 @@ impl ExecutionPlan for CubeTableExec {
838838
}
839839

840840
fn output_hints(&self) -> OptimizerHints {
841-
let sort_order;
842841
if let Some(snapshot_sort_on) = self.index_snapshot.sort_on() {
843842
// Note that this returns `None` if any of the columns were not found.
844843
// This only happens on programming errors.
845-
sort_order = snapshot_sort_on
844+
let sort_order = snapshot_sort_on
846845
.iter()
847846
.map(|c| self.schema.index_of(&c).ok())
848-
.collect()
847+
.collect();
848+
log::error!("CubeTableExec output_hints returning sort_order {:?}", sort_order);
849+
OptimizerHints::new_sorted(sort_order, Vec::new())
849850
} else {
850851
let index = self.index_snapshot.index().get_row();
852+
let mut approximate_sort_order = Vec::<Vec<usize>>::new();
853+
let mut chunk = Vec::<usize>::new();
851854
let sort_cols = index
852855
.get_columns()
853856
.iter()
854857
.take(index.sort_key_size() as usize)
855-
.map(|sort_col| self.schema.index_of(&sort_col.get_name()).ok())
856-
.take_while(|i| i.is_some())
857-
.map(|i| i.unwrap())
858-
.collect_vec();
859-
if !sort_cols.is_empty() {
860-
sort_order = Some(sort_cols)
858+
.map(|sort_col| self.schema.index_of(&sort_col.get_name()).ok());
859+
let mut prefix = true;
860+
for sort_col in sort_cols {
861+
if let Some(col) = sort_col {
862+
chunk.push(col);
863+
} else {
864+
if chunk.is_empty() {
865+
prefix &= !approximate_sort_order.is_empty();
866+
} else {
867+
approximate_sort_order.push(chunk);
868+
chunk = Vec::new();
869+
}
870+
}
871+
}
872+
873+
if !chunk.is_empty() {
874+
approximate_sort_order.push(chunk);
875+
}
876+
877+
let sort_order = if prefix && !approximate_sort_order.is_empty() {
878+
Some(approximate_sort_order[0].clone())
861879
} else {
862-
sort_order = None
880+
None
881+
};
882+
log::error!("CubeTableExec output_hints returning approximate sort order {:?} and sort_order {:?}", approximate_sort_order, sort_order);
883+
884+
OptimizerHints {
885+
sort_order,
886+
approximate_sort_order,
887+
approximate_sort_order_is_strict: true,
888+
approximate_sort_order_is_prefix: prefix,
889+
single_value_columns: Vec::new(),
863890
}
864891
}
865-
866-
OptimizerHints::new_sorted(
867-
sort_order,
868-
Vec::new(),
869-
)
870892
}
871893

872894
#[tracing::instrument(level = "trace", skip(self))]

0 commit comments

Comments
 (0)