Skip to content

Commit 10db6b3

Browse files
Redesign the try_reverse_output to support more cases (apache#19446)
## Which issue does this PR close? - Closes apache#19403 ## Rationale for this change After apache#19064 was completed, I found it couldn't meet our internal project requirements: 1. **Redesign try_reverse_output using EquivalenceProperties** - The previous implementation used a simple `is_reverse` method that could only handle basic reverse matching - Now leveraging `EquivalenceProperties` can handle more complex scenarios, including constant column elimination and monotonic functions 2. **Switch to ordering_satisfy method for ordering matching** - This method internally: - Normalizes orderings (removes constant columns) - Checks monotonic functions (like `date_trunc`, `CAST`, `CEIL`) - Handles prefix matching 3. **Extend sort pushdown support to more operators** - Added `try_pushdown_sort` implementation for `ProjectionExec`, `FilterExec`, `CooperativeExec` - These operators can now pass sort requests down to their children ## What changes are included in this PR? ### Core Changes: 1. **ParquetSource::try_reverse_output** (datasource-parquet/src/source.rs) - Added `eq_properties` parameter - Reverses all orderings in equivalence properties - Uses `ordering_satisfy` to check if reversed ordering satisfies the request - Removed `file_ordering` field and `with_file_ordering_info` method 2. **FileSource trait** (datasource/src/file.rs) - Updated `try_reverse_output` signature with `eq_properties` parameter - Added detailed documentation explaining parameter usage and examples 3. **FileScanConfig::try_pushdown_sort** (datasource/src/file_scan_config.rs) - Simplified logic to directly call `file_source.try_reverse_output` - No longer needs to pre-check ordering satisfaction or set file ordering info 4. **New operator support** - `FilterExec::try_pushdown_sort` - Pushes sort below filters - `ProjectionExec::try_pushdown_sort` - Pushes sort below projections - `CooperativeExec::try_pushdown_sort` - Supports sort pushdown in cooperative execution 5. **Removed obsolete methods** - Deleted `LexOrdering::is_reverse` - replaced by `ordering_satisfy` ### Feature Enhancements: **Supported optimization scenarios:** 1. **Constant column elimination** (Test 7) ```sql -- File ordering: [timeframe ASC, period_end ASC] -- Query: WHERE timeframe = 'quarterly' ORDER BY period_end DESC -- Effect: After timeframe becomes constant, reverse scan is enabled ``` 2. **Monotonic function support** (Test 8) ```sql -- File ordering: [ts ASC] -- Query: ORDER BY date_trunc('month', ts) DESC -- Effect: date_trunc is monotonic, reverse scan satisfies the request ``` ## Are these changes tested? Yes, comprehensive tests have been added: - **Test 7 (237 lines)**: Constant column elimination scenarios - Single constant column filter - Multi-value IN clauses (doesn't trigger optimization) - Literal constants in sort expressions - Non-leading column filters (edge cases) - **Test 8 (355 lines)**: Monotonic function scenarios - `date_trunc` (date truncation) - `CAST` (type conversion) - `CEIL` (ceiling) - `ABS` (negative case - not monotonic over mixed positive/negative range) All tests verify: - Presence of `reverse_row_groups=true` in physical plans - Correctness of query results ## Are there any user-facing changes? **API Changes:** - `FileSource::try_reverse_output` signature changed (added `eq_properties` parameter) - Removed `FileSource::with_file_ordering_info` method - Removed `LexOrdering::is_reverse` public method **User-visible improvements:** - More queries can leverage reverse row group scanning for optimization - Especially queries with `WHERE` clauses that make certain columns constant - Queries using monotonic functions (like date functions, type conversions) **Note:** This PR returns `Inexact` results because only row group order is reversed, not row order within row groups. Future enhancements could include: - File reordering based on statistics (returning `Exact`) - Partial sort pushdown for prefix matches --------- Co-authored-by: Adrian Garcia Badaracco <[email protected]>
1 parent 4960284 commit 10db6b3

File tree

9 files changed

+1646
-412
lines changed

9 files changed

+1646
-412
lines changed

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 374 additions & 6 deletions
Large diffs are not rendered by default.

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 206 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion_execution::object_store::ObjectStoreUrl;
4141
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
4242
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
4343
use datafusion_functions_aggregate::count::count_udaf;
44+
use datafusion_physical_expr::EquivalenceProperties;
4445
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
4546
use datafusion_physical_expr::expressions::{self, col};
4647
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -54,6 +55,7 @@ use datafusion_physical_plan::aggregates::{
5455
};
5556
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
5657
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
58+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
5759
use datafusion_physical_plan::filter::FilterExec;
5860
use datafusion_physical_plan::joins::utils::{JoinFilter, JoinOn};
5961
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
@@ -68,7 +70,7 @@ use datafusion_physical_plan::union::UnionExec;
6870
use datafusion_physical_plan::windows::{BoundedWindowAggExec, create_window_expr};
6971
use datafusion_physical_plan::{
7072
DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode, Partitioning,
71-
PlanProperties, displayable,
73+
PlanProperties, SortOrderPushdownResult, displayable,
7274
};
7375

7476
/// Create a non sorted parquet exec
@@ -774,3 +776,206 @@ pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
774776
fn format_lines(s: &str) -> Vec<String> {
775777
s.trim().split('\n').map(|s| s.to_string()).collect()
776778
}
779+
780+
/// Create a simple ProjectionExec with column indices (simplified version)
781+
pub fn simple_projection_exec(
782+
input: Arc<dyn ExecutionPlan>,
783+
columns: Vec<usize>,
784+
) -> Arc<dyn ExecutionPlan> {
785+
let schema = input.schema();
786+
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = columns
787+
.iter()
788+
.map(|&i| {
789+
let field = schema.field(i);
790+
(
791+
Arc::new(expressions::Column::new(field.name(), i))
792+
as Arc<dyn PhysicalExpr>,
793+
field.name().to_string(),
794+
)
795+
})
796+
.collect();
797+
798+
projection_exec(exprs, input).unwrap()
799+
}
800+
801+
/// Create a ProjectionExec with column aliases
802+
pub fn projection_exec_with_alias(
803+
input: Arc<dyn ExecutionPlan>,
804+
columns: Vec<(usize, &str)>,
805+
) -> Arc<dyn ExecutionPlan> {
806+
let schema = input.schema();
807+
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = columns
808+
.iter()
809+
.map(|&(i, alias)| {
810+
(
811+
Arc::new(expressions::Column::new(schema.field(i).name(), i))
812+
as Arc<dyn PhysicalExpr>,
813+
alias.to_string(),
814+
)
815+
})
816+
.collect();
817+
818+
projection_exec(exprs, input).unwrap()
819+
}
820+
821+
/// Create a sort expression with custom name and index
822+
pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr {
823+
PhysicalSortExpr {
824+
expr: Arc::new(expressions::Column::new(name, index)),
825+
options: SortOptions::default(),
826+
}
827+
}
828+
829+
/// A test data source that can display any requested ordering
830+
/// This is useful for testing sort pushdown behavior
831+
#[derive(Debug, Clone)]
832+
pub struct TestScan {
833+
schema: SchemaRef,
834+
output_ordering: Vec<LexOrdering>,
835+
plan_properties: PlanProperties,
836+
// Store the requested ordering for display
837+
requested_ordering: Option<LexOrdering>,
838+
}
839+
840+
impl TestScan {
841+
/// Create a new TestScan with the given schema and output ordering
842+
pub fn new(schema: SchemaRef, output_ordering: Vec<LexOrdering>) -> Self {
843+
let eq_properties = if !output_ordering.is_empty() {
844+
// Convert Vec<LexOrdering> to the format expected by new_with_orderings
845+
// We need to extract the inner Vec<PhysicalSortExpr> from each LexOrdering
846+
let orderings: Vec<Vec<PhysicalSortExpr>> = output_ordering
847+
.iter()
848+
.map(|lex_ordering| {
849+
// LexOrdering implements IntoIterator, so we can collect it
850+
lex_ordering.iter().cloned().collect()
851+
})
852+
.collect();
853+
854+
EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
855+
} else {
856+
EquivalenceProperties::new(Arc::clone(&schema))
857+
};
858+
859+
let plan_properties = PlanProperties::new(
860+
eq_properties,
861+
Partitioning::UnknownPartitioning(1),
862+
EmissionType::Incremental,
863+
Boundedness::Bounded,
864+
);
865+
866+
Self {
867+
schema,
868+
output_ordering,
869+
plan_properties,
870+
requested_ordering: None,
871+
}
872+
}
873+
874+
/// Create a TestScan with a single output ordering
875+
pub fn with_ordering(schema: SchemaRef, ordering: LexOrdering) -> Self {
876+
Self::new(schema, vec![ordering])
877+
}
878+
}
879+
880+
impl DisplayAs for TestScan {
881+
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
882+
match t {
883+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
884+
write!(f, "TestScan")?;
885+
if !self.output_ordering.is_empty() {
886+
write!(f, ": output_ordering=[")?;
887+
// Format the ordering in a readable way
888+
for (i, sort_expr) in self.output_ordering[0].iter().enumerate() {
889+
if i > 0 {
890+
write!(f, ", ")?;
891+
}
892+
write!(f, "{sort_expr}")?;
893+
}
894+
write!(f, "]")?;
895+
}
896+
// This is the key part - show what ordering was requested
897+
if let Some(ref req) = self.requested_ordering {
898+
write!(f, ", requested_ordering=[")?;
899+
for (i, sort_expr) in req.iter().enumerate() {
900+
if i > 0 {
901+
write!(f, ", ")?;
902+
}
903+
write!(f, "{sort_expr}")?;
904+
}
905+
write!(f, "]")?;
906+
}
907+
Ok(())
908+
}
909+
DisplayFormatType::TreeRender => {
910+
write!(f, "TestScan")
911+
}
912+
}
913+
}
914+
}
915+
916+
impl ExecutionPlan for TestScan {
917+
fn name(&self) -> &str {
918+
"TestScan"
919+
}
920+
921+
fn as_any(&self) -> &dyn Any {
922+
self
923+
}
924+
925+
fn properties(&self) -> &PlanProperties {
926+
&self.plan_properties
927+
}
928+
929+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
930+
vec![]
931+
}
932+
933+
fn with_new_children(
934+
self: Arc<Self>,
935+
children: Vec<Arc<dyn ExecutionPlan>>,
936+
) -> Result<Arc<dyn ExecutionPlan>> {
937+
if children.is_empty() {
938+
Ok(self)
939+
} else {
940+
internal_err!("TestScan should have no children")
941+
}
942+
}
943+
944+
fn execute(
945+
&self,
946+
_partition: usize,
947+
_context: Arc<TaskContext>,
948+
) -> Result<SendableRecordBatchStream> {
949+
internal_err!("TestScan is for testing optimizer only, not for execution")
950+
}
951+
952+
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
953+
Ok(Statistics::new_unknown(&self.schema))
954+
}
955+
956+
// This is the key method - implement sort pushdown
957+
fn try_pushdown_sort(
958+
&self,
959+
order: &[PhysicalSortExpr],
960+
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
961+
// For testing purposes, accept ANY ordering request
962+
// and create a new TestScan that shows what was requested
963+
let requested_ordering = LexOrdering::new(order.to_vec());
964+
965+
let mut new_scan = self.clone();
966+
new_scan.requested_ordering = requested_ordering;
967+
968+
// Always return Inexact to keep the Sort node (like Phase 1 behavior)
969+
Ok(SortOrderPushdownResult::Inexact {
970+
inner: Arc::new(new_scan),
971+
})
972+
}
973+
}
974+
975+
/// Helper function to create a TestScan with ordering
976+
pub fn test_scan_with_ordering(
977+
schema: SchemaRef,
978+
ordering: LexOrdering,
979+
) -> Arc<dyn ExecutionPlan> {
980+
Arc::new(TestScan::with_ordering(schema, ordering))
981+
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use datafusion_common::config::TableParquetOptions;
3838
use datafusion_datasource::TableSchema;
3939
use datafusion_datasource::file::FileSource;
4040
use datafusion_datasource::file_scan_config::FileScanConfig;
41-
use datafusion_physical_expr::conjunction;
4241
use datafusion_physical_expr::projection::ProjectionExprs;
42+
use datafusion_physical_expr::{EquivalenceProperties, conjunction};
4343
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
4444
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4545
use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -54,7 +54,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
5454

5555
#[cfg(feature = "parquet_encryption")]
5656
use datafusion_execution::parquet_encryption::EncryptionFactory;
57-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
57+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
5858
use itertools::Itertools;
5959
use object_store::ObjectStore;
6060
#[cfg(feature = "parquet_encryption")]
@@ -288,9 +288,6 @@ pub struct ParquetSource {
288288
pub(crate) projection: ProjectionExprs,
289289
#[cfg(feature = "parquet_encryption")]
290290
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
291-
/// The ordering of data within the files
292-
/// This is set by FileScanConfig when it knows the file ordering
293-
file_ordering: Option<LexOrdering>,
294291
/// If true, read files in reverse order and reverse row groups within files.
295292
/// But it's not guaranteed that rows within row groups are in reverse order,
296293
/// so we still need to sort them after reading, so the reverse scan is inexact.
@@ -320,7 +317,6 @@ impl ParquetSource {
320317
metadata_size_hint: None,
321318
#[cfg(feature = "parquet_encryption")]
322319
encryption_factory: None,
323-
file_ordering: None,
324320
reverse_row_groups: false,
325321
}
326322
}
@@ -397,12 +393,6 @@ impl ParquetSource {
397393
self
398394
}
399395

400-
/// If set, indicates the ordering of data within the files being read.
401-
pub fn with_file_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
402-
self.file_ordering = ordering;
403-
self
404-
}
405-
406396
/// Return the value described in [`Self::with_pushdown_filters`]
407397
pub(crate) fn pushdown_filters(&self) -> bool {
408398
self.table_parquet_options.global.pushdown_filters
@@ -769,44 +759,61 @@ impl FileSource for ParquetSource {
769759
fn try_reverse_output(
770760
&self,
771761
order: &[PhysicalSortExpr],
762+
eq_properties: &EquivalenceProperties,
772763
) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
773-
// Check if we have file ordering information
774-
let file_ordering = match &self.file_ordering {
775-
Some(ordering) => ordering,
776-
None => return Ok(SortOrderPushdownResult::Unsupported),
777-
};
778-
779-
// Create a LexOrdering from the requested order to use the is_reverse method
780-
let Some(requested_ordering) = LexOrdering::new(order.to_vec()) else {
781-
// Empty ordering requested, cannot optimize
764+
if order.is_empty() {
782765
return Ok(SortOrderPushdownResult::Unsupported);
783-
};
766+
}
784767

785-
// Check if reversing the file ordering would satisfy the requested ordering
786-
if file_ordering.is_reverse(&requested_ordering) {
787-
// Phase 1: Enable reverse row group scanning
788-
let new_source = self.clone().with_reverse_row_groups(true);
768+
// Build new equivalence properties with the reversed ordering.
769+
// This allows us to check if the reversed ordering satisfies the request
770+
// by leveraging:
771+
// - Function monotonicity (e.g., extract_year_month preserves ordering)
772+
// - Constant columns (from filters)
773+
// - Other equivalence relationships
774+
//
775+
// Example flow:
776+
// 1. File ordering: [extract_year_month(ws) DESC, ws DESC]
777+
// 2. After reversal: [extract_year_month(ws) ASC, ws ASC]
778+
// 3. Requested: [ws ASC]
779+
// 4. Through extract_year_month's monotonicity property, the reversed
780+
// ordering satisfies [ws ASC] even though it has additional prefix
781+
let reversed_eq_properties = {
782+
let mut new = eq_properties.clone();
783+
new.clear_orderings();
784+
785+
// Reverse each ordering in the equivalence properties
786+
let reversed_orderings = eq_properties
787+
.oeq_class()
788+
.iter()
789+
.map(|ordering| {
790+
ordering
791+
.iter()
792+
.map(|expr| expr.reverse())
793+
.collect::<Vec<_>>()
794+
})
795+
.collect::<Vec<_>>();
796+
797+
new.add_orderings(reversed_orderings);
798+
new
799+
};
789800

790-
// Return Inexact because we're only reversing row group order,
791-
// not guaranteeing perfect row-level ordering
792-
return Ok(SortOrderPushdownResult::Inexact {
793-
inner: Arc::new(new_source) as Arc<dyn FileSource>,
794-
});
801+
// Check if the reversed ordering satisfies the requested ordering
802+
if !reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
803+
return Ok(SortOrderPushdownResult::Unsupported);
795804
}
796805

806+
// Return Inexact because we're only reversing row group order,
807+
// not guaranteeing perfect row-level ordering
808+
let new_source = self.clone().with_reverse_row_groups(true);
809+
Ok(SortOrderPushdownResult::Inexact {
810+
inner: Arc::new(new_source) as Arc<dyn FileSource>,
811+
})
812+
797813
// TODO Phase 2: Add support for other optimizations:
798814
// - File reordering based on min/max statistics
799815
// - Detection of exact ordering (return Exact to remove Sort operator)
800816
// - Partial sort pushdown for prefix matches
801-
802-
Ok(SortOrderPushdownResult::Unsupported)
803-
}
804-
805-
fn with_file_ordering_info(
806-
&self,
807-
ordering: Option<LexOrdering>,
808-
) -> datafusion_common::Result<Arc<dyn FileSource>> {
809-
Ok(Arc::new(self.clone().with_file_ordering(ordering)))
810817
}
811818
}
812819

0 commit comments

Comments
 (0)