Skip to content

Commit d4cc975

Browse files
committed
chore(cubestore): Upgrade DF: use correct input ordering trait impls on ClusterSendExec and WorkerExec
1 parent d20c9ab commit d4cc975

File tree

2 files changed

+11
-32
lines changed

2 files changed

+11
-32
lines changed

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,22 +1714,12 @@ impl ExecutionPlan for WorkerExec {
17141714
vec![Distribution::SinglePartition; self.children().len()]
17151715
}
17161716

1717-
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
1718-
let input_ordering = self.input.required_input_ordering();
1719-
if !input_ordering.is_empty() {
1720-
vec![input_ordering[0].clone()]
1721-
} else {
1722-
vec![None]
1723-
}
1724-
}
1725-
17261717
fn maintains_input_order(&self) -> Vec<bool> {
1727-
let maintains_input_order = self.input.maintains_input_order();
1728-
if !maintains_input_order.is_empty() {
1729-
vec![maintains_input_order[0]]
1730-
} else {
1731-
vec![false]
1732-
}
1718+
// TODO upgrade DF: If the WorkerExec has the number of partitions so it can produce the same output, we could occasionally return true.
1719+
// vec![self.num_clustersend_partitions <= 1 && self.input_for_optimizations.output_partitioning().partition_count() <= 1]
1720+
1721+
// For now, same as default implementation:
1722+
vec![false]
17331723
}
17341724
}
17351725

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
7474
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
7575
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7676
use datafusion::physical_plan::{
77-
collect, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
78-
PhysicalExpr, PlanProperties, SendableRecordBatchStream,
77+
collect, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream
7978
};
8079
use datafusion::prelude::{and, SessionConfig, SessionContext};
8180
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
@@ -1614,22 +1613,12 @@ impl ExecutionPlan for ClusterSendExec {
16141613
&self.properties
16151614
}
16161615

1617-
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
1618-
let input_ordering = self.input_for_optimizations.required_input_ordering();
1619-
if !input_ordering.is_empty() {
1620-
vec![input_ordering[0].clone()]
1621-
} else {
1622-
vec![None]
1623-
}
1624-
}
1625-
16261616
fn maintains_input_order(&self) -> Vec<bool> {
1627-
let maintains_input_order = self.input_for_optimizations.maintains_input_order();
1628-
if !maintains_input_order.is_empty() {
1629-
vec![maintains_input_order[0]]
1630-
} else {
1631-
vec![false]
1632-
}
1617+
// TODO upgrade DF: If the WorkerExec has the number of partitions so it can produce the same output, we could occasionally return true.
1618+
// vec![self.partitions.len() <= 1 && self.input_for_optimizations.output_partitioning().partition_count() <= 1]
1619+
1620+
// For now, same as default implementation:
1621+
vec![false]
16331622
}
16341623

16351624
fn required_input_distribution(&self) -> Vec<Distribution> {

0 commit comments

Comments
 (0)