Skip to content

Commit 14833a7

Browse files
committed
chore(cubestore): Upgrade DF: Bugfix from topk: Correct compute_properties in WorkerExec
1 parent 4a543e8 commit 14833a7

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,13 +1757,11 @@ impl WorkerExec {
17571757
required_input_ordering: Option<LexRequirement>,
17581758
worker_planning_params: WorkerPlanningParams,
17591759
) -> WorkerExec {
1760-
let properties =
1761-
input
1762-
.properties()
1763-
.clone()
1764-
.with_partitioning(Partitioning::UnknownPartitioning(
1765-
worker_planning_params.worker_partition_count,
1766-
));
1760+
// This, importantly, gives us the same PlanProperties as ClusterSendExec.
1761+
let properties = ClusterSendExec::compute_properties(
1762+
input.properties(),
1763+
worker_planning_params.worker_partition_count,
1764+
);
17671765
WorkerExec {
17681766
input,
17691767
max_batch_rows,
@@ -1796,12 +1794,16 @@ impl ExecutionPlan for WorkerExec {
17961794
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
17971795
assert_eq!(children.len(), 1);
17981796
let input = children.into_iter().next().unwrap();
1797+
let properties: PlanProperties = ClusterSendExec::compute_properties(
1798+
input.properties(),
1799+
self.properties.output_partitioning().partition_count(),
1800+
);
17991801
Ok(Arc::new(WorkerExec {
18001802
input,
18011803
max_batch_rows: self.max_batch_rows,
18021804
limit_and_reverse: self.limit_and_reverse.clone(),
18031805
required_input_ordering: self.required_input_ordering.clone(),
1804-
properties: self.properties.clone(),
1806+
properties,
18051807
}))
18061808
}
18071809

@@ -1831,7 +1833,7 @@ impl ExecutionPlan for WorkerExec {
18311833

18321834
fn maintains_input_order(&self) -> Vec<bool> {
18331835
// TODO upgrade DF: If the WorkerExec has the number of partitions so it can produce the same output, we could occasionally return true.
1834-
// vec![self.num_clustersend_partitions <= 1 && self.input_for_optimizations.output_partitioning().partition_count() <= 1]
1836+
// vec![self.input_for_optimizations.output_partitioning().partition_count() <= 1]
18351837

18361838
// For now, same as default implementation:
18371839
vec![false]
@@ -1883,7 +1885,7 @@ pub mod tests {
18831885
use datafusion::error::DataFusionError;
18841886
use datafusion::execution::{SessionState, SessionStateBuilder};
18851887
use datafusion::logical_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
1886-
use datafusion::prelude::{SessionConfig, SessionContext};
1888+
use datafusion::prelude::SessionConfig;
18871889
use datafusion::sql::TableReference;
18881890
use std::collections::HashMap;
18891891
use std::iter::FromIterator;

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1307,12 +1307,19 @@ impl ClusterSendExec {
13071307
})
13081308
}
13091309

1310+
/// Also used by WorkerExec (to produce the exact same plan properties so we get the same optimizations).
13101311
pub fn compute_properties(
13111312
input_properties: &PlanProperties,
13121313
partitions_num: usize,
13131314
) -> PlanProperties {
1315+
// Coalescing partitions (on the worker side) loses existing orderings:
1316+
let mut eq_properties = input_properties.eq_properties.clone();
1317+
if input_properties.output_partitioning().partition_count() > 1 {
1318+
eq_properties.clear_orderings();
1319+
eq_properties.clear_per_partition_constants();
1320+
}
13141321
PlanProperties::new(
1315-
input_properties.eq_properties.clone(),
1322+
eq_properties,
13161323
Partitioning::UnknownPartitioning(partitions_num),
13171324
input_properties.execution_mode.clone(),
13181325
)
@@ -1685,6 +1692,7 @@ impl ExecutionPlan for ClusterSendExec {
16851692
}
16861693

16871694
fn required_input_distribution(&self) -> Vec<Distribution> {
1695+
// TODO: If this is in place, and it is obeyed (with EnforceDistribution?), then we don't need to use a CoalescePartitions node in worker exec.
16881696
vec![Distribution::SinglePartition; self.children().len()]
16891697
}
16901698
}

0 commit comments

Comments
 (0)