Skip to content

Commit 4c5011e

Browse files
committed
chore(cubestore): Upgrade DF: Add distribution and input order requirement to LastRowByUniqueKeyExec
1 parent 0acf529 commit 4c5011e

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

rust/cubestore/cubestore/src/queryplanner/merge_sort.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use datafusion::arrow::error::ArrowError;
88
use datafusion::error::DataFusionError;
99
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
1010
use datafusion::physical_expr::expressions::Column;
11-
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11+
use datafusion::physical_expr::{LexRequirement, PhysicalSortRequirement};
1212
use datafusion::physical_plan::{
13-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
13+
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties
1414
};
1515
use futures::Stream;
1616
use futures_util::StreamExt;
@@ -87,6 +87,16 @@ impl ExecutionPlan for LastRowByUniqueKeyExec {
8787
vec![&self.input]
8888
}
8989

90+
fn required_input_distribution(&self) -> Vec<Distribution> {
91+
vec![Distribution::SinglePartition]
92+
}
93+
94+
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
95+
// We're leaning a bit on the fact that we know the original input was a SortPreservingMergeExec.
96+
let ordering = self.properties.equivalence_properties().oeq_class().output_ordering();
97+
vec![ordering.map(|exprs| PhysicalSortRequirement::from_sort_exprs(&exprs))]
98+
}
99+
90100
fn with_new_children(
91101
self: Arc<Self>,
92102
children: Vec<Arc<dyn ExecutionPlan>>,

0 commit comments

Comments
 (0)