Skip to content

Commit 4cf1a82

Browse files
committed
feat: Add basic output_hints to SortExec
1 parent d92a1d8 commit 4cf1a82

File tree

1 file changed

+30
-11
lines changed
  • datafusion/src/physical_plan

1 file changed

+30
-11
lines changed

datafusion/src/physical_plan/sort.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
//! Defines the SORT plan
1919
20-
use super::{RecordBatchStream, SendableRecordBatchStream};
2120
use crate::cube_ext;
2221
use crate::error::{DataFusionError, Result};
23-
use crate::physical_plan::expressions::PhysicalSortExpr;
22+
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
2423
use crate::physical_plan::{
2524
common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SQLMetric,
2625
};
26+
use crate::physical_plan::{
27+
OptimizerHints, RecordBatchStream, SendableRecordBatchStream,
28+
};
2729
pub use arrow::compute::SortOptions;
2830
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
2931
use arrow::datatypes::SchemaRef;
@@ -186,15 +188,32 @@ impl ExecutionPlan for SortExec {
186188
metrics
187189
}
188190

189-
// TODO
190-
// fn output_sort_order(&self) -> Result<Option<Vec<usize>>> {
191-
// let mut order = Vec::with_capacity(self.expr.len());
192-
// for s in &self.expr {
193-
// let col = s.expr.as_any().downcast_ref::<Column>()?;
194-
// order.push(self.schema().index_of(col.name())?);
195-
// }
196-
// Ok(Some(order))
197-
// }
191+
fn output_hints(&self) -> OptimizerHints {
192+
let mut order = Vec::with_capacity(self.expr.len());
193+
// let mut sort_order_truncated = false;
194+
for s in &self.expr {
195+
let column = s.expr.as_any().downcast_ref::<Column>();
196+
if column.is_none() {
197+
// sort_order_truncated = true;
198+
break;
199+
}
200+
let column = column.unwrap();
201+
202+
let index: usize = match self.schema().index_of(column.name()) {
203+
Ok(ix) => ix,
204+
Err(_) => return OptimizerHints::default(),
205+
};
206+
order.push(index);
207+
}
208+
209+
let input_hints = self.input.output_hints();
210+
// TODO: If sort_order_truncated is false, we can combine input_hints.sort_order. Do this.
211+
212+
OptimizerHints {
213+
sort_order: Some(order),
214+
single_value_columns: input_hints.single_value_columns.clone(),
215+
}
216+
}
198217
}
199218

200219
#[tracing::instrument(level = "trace", skip(batch, schema, expr))]

0 commit comments

Comments
 (0)