Skip to content

Commit a70747f

Browse files
committed
WIP: add member fn compute_aggregation_strategy, logging
1 parent 0c5120a commit a70747f

File tree

1 file changed

+25
-2
lines changed

1 file changed

+25
-2
lines changed

datafusion/src/physical_plan/planner.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,8 @@ impl DefaultPhysicalPlanner {
506506
})
507507
.collect::<Result<Vec<_>>>()?;
508508

509+
log::error!("DefaultPhysicalPlanner computing AggregateStrategy");
510+
509511
//It's not obvious here, but "order" here is mapping from input "sort_on"(*) into
510512
//positions of "group by" columns. (*) but with some flexibility if it has
511513
//single-value columns
@@ -514,6 +516,7 @@ impl DefaultPhysicalPlanner {
514516
let (strategy, order): (AggregateStrategy, Option<Vec<usize>>) =
515517
match input_sortedness.sawtooth_levels() {
516518
Some(0) => {
519+
log::error!("DefaultPhysicalExpr: Perfect match for inplace aggregation");
517520
let order = input_sortedness.sort_order[0]
518521
.iter()
519522
.map(|(_sort_key_offset, group_key_offset)| {
@@ -522,8 +525,14 @@ impl DefaultPhysicalPlanner {
522525
.collect_vec();
523526
(AggregateStrategy::InplaceSorted, Some(order))
524527
}
525-
Some(_) => (AggregateStrategy::Hash, None),
526-
_ => (AggregateStrategy::Hash, None),
528+
Some(n) => {
529+
log::error!("DefaultPhysicalExpr: Non-perfect match for inplace aggregation: {} clumps", n);
530+
(AggregateStrategy::Hash, None)
531+
},
532+
_ => {
533+
log::error!("DefaultPhysicalExpr: No match for inplace aggregation");
534+
(AggregateStrategy::Hash, None)
535+
},
527536
};
528537

529538
// TODO: fix cubestore planning and re-enable.
@@ -1724,6 +1733,20 @@ impl SortednessByGroupKey {
17241733
None
17251734
}
17261735
}
1736+
1737+
/// Returns group key sort order and AggregateStrategy, the same result as the previously
1738+
/// existing compute_aggregate_strategy function.
1739+
pub fn compute_aggregate_strategy(&self) -> (AggregateStrategy, Option<Vec<usize>>) {
1740+
if self.is_sorted_by_group_key() {
1741+
let order = self.sort_order[0]
1742+
.iter()
1743+
.map(|&(_sort_i, group_i)| group_i)
1744+
.collect_vec();
1745+
(AggregateStrategy::InplaceSorted, Some(order))
1746+
} else {
1747+
(AggregateStrategy::Hash, None)
1748+
}
1749+
}
17271750
}
17281751

17291752
/// Checks the degree to which input is sortable by a group key. If it succeeds, returns clumps of

0 commit comments

Comments
 (0)