Skip to content

Commit e0804f8

Browse files
committed
Bypass compute_aggregation_strategoy call
1 parent 65aacf2 commit e0804f8

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

datafusion/src/physical_plan/planner.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -506,10 +506,26 @@ impl DefaultPhysicalPlanner {
506506
})
507507
.collect::<Result<Vec<_>>>()?;
508508

509-
//It's not obvious here, but "order" here is mapping from input "sort_on" into
510-
//positions of "group by" columns
511-
let (strategy, order) =
512-
compute_aggregation_strategy(input_exec.as_ref(), &groups);
509+
//It's not obvious here, but "order" here is mapping from input "sort_on"(*) into
510+
//positions of "group by" columns. (*) but with some flexibility if it has
511+
//single-value columns
512+
let input_sortedness =
513+
input_sortedness_by_group_key(input_exec.as_ref(), &groups);
514+
let (strategy, order): (AggregateStrategy, Option<Vec<usize>>) =
515+
match input_sortedness.sawtooth_levels() {
516+
Some(0) => {
517+
let order = input_sortedness.sort_order[0]
518+
.iter()
519+
.map(|(_sort_key_offset, group_key_offset)| {
520+
*group_key_offset
521+
})
522+
.collect_vec();
523+
(AggregateStrategy::InplaceSorted, Some(order))
524+
}
525+
Some(_) => (AggregateStrategy::Hash, None),
526+
_ => (AggregateStrategy::Hash, None),
527+
};
528+
513529
// TODO: fix cubestore planning and re-enable.
514530
if false && input_exec.output_partitioning().partition_count() == 1 {
515531
// A single pass is enough for 1 partition.
@@ -1663,21 +1679,7 @@ pub fn evaluate_const(expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExp
16631679
Ok(Arc::new(Literal::new(scalar)))
16641680
}
16651681

1666-
/// Returns the most efficient aggregation strategy for the given input.
1667-
pub fn compute_aggregation_strategy(
1668-
input: &dyn ExecutionPlan,
1669-
group_key: &[(Arc<dyn PhysicalExpr>, String)],
1670-
) -> (AggregateStrategy, /*sort_order*/ Option<Vec<usize>>) {
1671-
let mut sort_order = Vec::new();
1672-
if !group_key.is_empty()
1673-
&& input_sorted_by_group_key(input, &group_key, &mut sort_order)
1674-
{
1675-
(AggregateStrategy::InplaceSorted, Some(sort_order))
1676-
} else {
1677-
(AggregateStrategy::Hash, None)
1678-
}
1679-
}
1680-
1682+
// TODO: Remove
16811683
fn input_sorted_by_group_key(
16821684
input: &dyn ExecutionPlan,
16831685
group_key: &[(Arc<dyn PhysicalExpr>, String)],
@@ -1753,7 +1755,10 @@ pub fn input_sortedness_by_group_key(
17531755
input: &dyn ExecutionPlan,
17541756
group_key: &[(Arc<dyn PhysicalExpr>, String)],
17551757
) -> SortednessByGroupKey {
1756-
assert!(!group_key.is_empty());
1758+
if group_key.is_empty() {
1759+
// The caller has to deal with it (and in fact it wants to).
1760+
return SortednessByGroupKey::failed();
1761+
}
17571762

17581763
let hints = input.output_hints();
17591764
// We check the group key is a prefix of the sort key.

0 commit comments

Comments
 (0)