Skip to content

Commit ffbed2d

Browse files
committed
WIP: Make ensure_partition_merge rewrites not apply within AggregateTopKExec
1 parent 8307473 commit ffbed2d

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::queryplanner::planning::WorkerExec;
22
use crate::queryplanner::query_executor::ClusterSendExec;
33
use crate::queryplanner::tail_limit::TailLimitExec;
4+
use crate::queryplanner::topk::AggregateTopKExec;
45
use datafusion::error::DataFusionError;
6+
use datafusion::physical_optimizer::topk_aggregation::TopKAggregation;
57
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
68
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
79
use datafusion::physical_plan::limit::GlobalLimitExec;
@@ -90,15 +92,15 @@ pub fn push_aggregate_to_workers(
9092
)?))
9193
}
9294

93-
// TODO upgrade DF: this one was handled by something else but most likely only in sorted scenario
94-
pub fn ensure_partition_merge(
95+
pub fn ensure_partition_merge_helper(
9596
p: Arc<dyn ExecutionPlan>,
97+
new_child: &mut bool,
9698
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
9799
if p.as_any().is::<ClusterSendExec>()
98100
|| p.as_any().is::<WorkerExec>()
99101
|| p.as_any().is::<UnionExec>()
100102
{
101-
if let Some(ordering) = p.output_ordering() {
103+
let rewritten: Arc<dyn ExecutionPlan> = if let Some(ordering) = p.output_ordering() {
102104
let ordering = ordering.to_vec();
103105
let merged_children = p
104106
.children()
@@ -107,8 +109,8 @@ pub fn ensure_partition_merge(
107109
Arc::new(SortPreservingMergeExec::new(ordering.clone(), c.clone()))
108110
})
109111
.collect();
110-
let new_plan = p.with_new_children(merged_children)?;
111-
Ok(Arc::new(SortPreservingMergeExec::new(ordering, new_plan)))
112+
let new_plan = p.clone().with_new_children(merged_children)?;
113+
Arc::new(SortPreservingMergeExec::new(ordering, new_plan))
112114
} else {
113115
let merged_children = p
114116
.children()
@@ -117,14 +119,51 @@ pub fn ensure_partition_merge(
117119
Arc::new(CoalescePartitionsExec::new(c.clone()))
118120
})
119121
.collect();
120-
let new_plan = p.with_new_children(merged_children)?;
121-
Ok(Arc::new(CoalescePartitionsExec::new(new_plan)))
122-
}
122+
let new_plan = p.clone().with_new_children(merged_children)?;
123+
Arc::new(CoalescePartitionsExec::new(new_plan))
124+
};
125+
*new_child = true;
126+
Ok(rewritten)
123127
} else {
124128
Ok(p)
125129
}
126130
}
127131

132+
pub fn ensure_partition_merge(
133+
p: Arc<dyn ExecutionPlan>,
134+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
135+
let mut new_child = false;
136+
ensure_partition_merge_helper(p, &mut new_child)
137+
}
138+
139+
// TODO upgrade DF: this one was handled by something else but most likely only in sorted scenario
140+
pub fn ensure_partition_merge_with_acceptable_parent(
141+
parent: Arc<dyn ExecutionPlan>,
142+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
143+
// TODO upgrade DF: Figure out the right clean way to handle this function in general --
144+
// possibly involving uncommenting EnforceDistribution, and having this
145+
// SortPreservingMergeExec/CoalescePartitionsExec wrapping the ClusterSendExec node as we
146+
// construct the query.
147+
148+
// Special case, don't do this inside AggregateTopKExec-ClusterSendExec-Aggregate because we
149+
// need the partitioning: (This is gross.)
150+
if parent.as_any().is::<AggregateTopKExec>() {
151+
return Ok(parent);
152+
}
153+
154+
let mut any_new_children = false;
155+
let mut new_children = Vec::new();
156+
157+
for p in parent.children() {
158+
new_children.push(ensure_partition_merge_helper(p.clone(), &mut any_new_children)?);
159+
}
160+
if any_new_children {
161+
parent.with_new_children(new_children)
162+
} else {
163+
Ok(parent)
164+
}
165+
}
166+
128167
///Add `GlobalLimitExec` behind worker node if this node has `limit` property set
129168
///Should be executed after all optimizations which can move `Worker` node or change it input
130169
pub fn add_limit_to_workers(

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion::logical_expr::LogicalPlan;
2929
use datafusion::physical_optimizer::PhysicalOptimizerRule;
3030
use datafusion::physical_plan::ExecutionPlan;
3131
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
32+
use distributed_partial_aggregate::ensure_partition_merge_with_acceptable_parent;
3233
use rewrite_plan::rewrite_physical_plan;
3334
use std::sync::Arc;
3435
use trace_data_loaded::add_trace_data_loaded_exec;
@@ -145,7 +146,11 @@ fn pre_optimize_physical_plan(
145146
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
146147
// TODO upgrade DF
147148
let p = rewrite_physical_plan(p, &mut |p| push_aggregate_to_workers(p))?;
148-
let p = rewrite_physical_plan(p, &mut |p| ensure_partition_merge(p))?;
149+
150+
// Handles non-root-node cases
151+
let p = rewrite_physical_plan(p, &mut |p| ensure_partition_merge_with_acceptable_parent(p))?;
152+
// Handles the root node case
153+
let p = ensure_partition_merge(p)?;
149154
Ok(p)
150155
}
151156

0 commit comments

Comments
 (0)