Skip to content

Commit 8307473

Browse files
committed
WIP: WorkerExec metadata parallelling ClusterSendExec with push_aggregate_to_workers including ordering
1 parent 8e46f32 commit 8307473

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/distributed_partial_aggregate.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,23 +50,24 @@ pub fn push_aggregate_to_workers(
5050
let clustersend_input = p_partial.clone()
5151
.with_new_children(vec![cs.input_for_optimizations.clone()])?;
5252

53+
// Note that required_input_ordering is applicable when p_final_agg has a Sorted input mode.
54+
5355
// Router plan, replace partial aggregate with cluster send.
5456
Arc::new(
5557
cs.with_changed_schema(
5658
clustersend_input,
59+
p_final_agg.required_input_ordering().into_iter().next().unwrap(),
5760
),
5861
)
5962
} else if let Some(w) = agg.input().as_any().downcast_ref::<WorkerExec>() {
6063
let worker_input = p_partial.clone().with_new_children(vec![w.input.clone()])?;
6164

62-
// TODO upgrade DF: both cs.with_changed_schema and this need required_input_ordering if we have a sorted (inplace) aggregate pair here
63-
6465
// Worker plan, execute partial aggregate inside the worker.
6566
Arc::new(WorkerExec::new(
6667
worker_input,
6768
w.max_batch_rows,
6869
w.limit_and_reverse.clone(),
69-
/* required_input_ordering */ None,
70+
p_final_agg.required_input_ordering().into_iter().next().unwrap(),
7071
))
7172
} else {
7273
return Ok(p_final);

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,7 +1502,7 @@ impl ClusterSendExec {
15021502
r
15031503
}
15041504

1505-
pub fn with_changed_schema(&self, input_for_optimizations: Arc<dyn ExecutionPlan>) -> Self {
1505+
pub fn with_changed_schema(&self, input_for_optimizations: Arc<dyn ExecutionPlan>, new_required_input_ordering: Option<LexRequirement>) -> Self {
15061506
ClusterSendExec {
15071507
properties: Self::compute_properties(
15081508
input_for_optimizations.properties(),
@@ -1513,7 +1513,7 @@ impl ClusterSendExec {
15131513
serialized_plan: self.serialized_plan.clone(),
15141514
input_for_optimizations,
15151515
use_streaming: self.use_streaming,
1516-
required_input_ordering: None,
1516+
required_input_ordering: new_required_input_ordering,
15171517
}
15181518
}
15191519

0 commit comments

Comments
 (0)