diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 4bbb9d7ada7e..7cf291e0744b 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -41,8 +41,9 @@ pub use plan::{ projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainFormat, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, - PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, - Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, + PlanType, Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort, + StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, + Unnest, Values, Window, }; pub use statement::{ Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b4f2902cc43e..4f35d25887a0 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2537,6 +2537,43 @@ impl PartialOrd for Window { } } +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Default)] +pub struct ScanOrdering { + /// Optional preferred ordering for the scan that matches the output order of upstream query nodes. + /// It is optional / best effort for the scan to produce this ordering. + /// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away. + /// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort. + /// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible. + pub preferred_ordering: Option>, +} + +impl ScanOrdering { + /// Create a new ScanOrdering + pub fn with_preferred_ordering(mut self, preferred_ordering: Vec) -> Self { + self.preferred_ordering = Some(preferred_ordering); + self + } +} + +impl Debug for ScanOrdering { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let ordering_display = self + .preferred_ordering + .as_ref() + .map(|ordering| { + ordering + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + }) + .unwrap_or_else(|| "None".to_string()); + f.debug_struct("ScanOrdering") + .field("preferred_ordering", &ordering_display) + .finish_non_exhaustive() + } +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScan { @@ -2552,6 +2589,8 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Ordering for the scan + pub ordering: Option, } impl Debug for TableScan { @@ -2563,6 +2602,7 @@ impl Debug for TableScan { .field("projected_schema", &self.projected_schema) .field("filters", &self.filters) .field("fetch", &self.fetch) + .field("ordering", &self.ordering) .finish_non_exhaustive() } } @@ -2574,6 +2614,7 @@ impl PartialEq for TableScan { && self.projected_schema == other.projected_schema && self.filters == other.filters && self.fetch == other.fetch + && self.ordering == other.ordering } } @@ -2593,18 +2634,22 @@ impl PartialOrd for TableScan { pub filters: &'a Vec, /// Optional number of rows to read pub fetch: &'a Option, + /// Optional preferred ordering for the scan + pub ordering: &'a Option, } let comparable_self = ComparableTableScan { table_name: &self.table_name, projection: &self.projection, filters: &self.filters, fetch: &self.fetch, + ordering: &self.ordering, }; let comparable_other = ComparableTableScan { table_name: &other.table_name, projection: &other.projection, filters: &other.filters, fetch: &other.fetch, + ordering: &other.ordering, }; comparable_self.partial_cmp(&comparable_other) } @@ -2617,6 +2662,7 @@ impl Hash for TableScan { self.projected_schema.hash(state); self.filters.hash(state); self.fetch.hash(state); + self.ordering.hash(state); } } @@ -2670,8 +2716,65 @@ impl TableScan { projected_schema, filters, fetch, + ordering: None, }) } + + /// Sets the preferred ordering for this table scan using the builder pattern. + /// + /// The preferred ordering serves as a hint to table providers about the desired + /// sort order for the data. Table providers can use this information to optimize + /// data access patterns, choose appropriate indexes, or leverage existing sort + /// orders in the underlying storage. + /// + /// # Parameters + /// + /// * `preferred_ordering` - An optional vector of sort expressions representing + /// the desired ordering. `None` indicates no specific ordering preference. + /// + /// # Returns + /// + /// Returns `self` to enable method chaining in the builder pattern. + /// + /// # Examples + /// + /// ```rust + /// use datafusion_expr::{col, SortExpr}; + /// # use datafusion_expr::logical_plan::{TableScan, builder::table_source}; + /// # use std::sync::Arc; + /// # use datafusion_common::{TableReference, DFSchema}; + /// # use arrow::datatypes::{Schema, Field, DataType}; + /// + /// // Create a table scan with preferred ordering by column 'a' ascending + /// # let table_name = TableReference::bare("test"); + /// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + /// # let source = table_source(&schema); + /// # let projection = None; + /// # let projected_schema = Arc::new(datafusion_common::DFSchema::empty()); + /// # let filters = vec![]; + /// # let fetch = None; + /// let table_scan = TableScan { + /// table_name, + /// source, + /// projection, + /// projected_schema, + /// filters, + /// fetch, + /// preferred_ordering: None, + /// }.with_preferred_ordering(Some(vec![ + /// SortExpr::new(col("a"), true, false) // ASC NULLS LAST + /// ])); + /// ``` + /// + /// # Notes + /// + /// This is purely an optimization hint. The table provider may choose to ignore + /// the preferred ordering if it cannot be efficiently satisfied, and the query + /// execution engine should not rely on the data being returned in this order. + pub fn with_ordering(mut self, ordering: ScanOrdering) -> Self { + self.ordering = Some(ordering); + self + } } // Repartition the plan based on a partitioning scheme. @@ -4896,6 +4999,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + ordering: None, })); let col = schema.field_names()[0].clone(); @@ -4926,6 +5030,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + ordering: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 47088370a1d9..dfc216db403f 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -599,6 +599,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + ordering, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -607,6 +608,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + ordering, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 280010e3d92c..8d6088cc9d91 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -58,6 +58,7 @@ pub mod optimizer; pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; +pub mod push_down_sort; pub mod replace_distinct_aggregate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 97402c990b83..6b038d897498 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -242,6 +242,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + ordering, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -257,6 +258,10 @@ fn optimize_projections( filters, fetch, ) + .map(|s| match ordering { + Some(ordering) => s.with_ordering(ordering), + None => s, + }) .map(LogicalPlan::TableScan) .map(Transformed::yes); } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 49806d6db344..302e6f343f8a 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -51,6 +51,7 @@ use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; +use crate::push_down_sort::PushDownSort; use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; @@ -242,6 +243,8 @@ impl Optimizer { Arc::new(EliminateOuterJoin::new()), // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), + // Sort pushdown should happen before filter pushdown to maximize optimization opportunities + Arc::new(PushDownSort::new()), Arc::new(PushDownFilter::new()), Arc::new(SingleDistinctToGroupBy::new()), // The previous optimizations added expressions and projections, diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 27c2499c8a26..f4230154544a 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3055,6 +3055,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + ordering: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs new file mode 100644 index 000000000000..1916e21e2e72 --- /dev/null +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -0,0 +1,568 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PushDownSort`] pushes sort expressions into table scans to enable +//! sort pushdown optimizations by table providers + +use std::sync::Arc; + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; + +use datafusion_common::tree_node::Transformed; +use datafusion_common::Result; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ScanOrdering, SortExpr}; + +/// Optimization rule that pushes sort expressions down to table scans +/// when the sort can potentially be optimized by the table provider. +/// +/// This rule looks for `Sort -> TableScan` patterns and moves the sort +/// expressions into the `TableScan.preferred_ordering` field, allowing +/// table providers to potentially optimize the scan based on sort requirements. +/// +/// # Behavior +/// +/// The optimizer preserves the original `Sort` node as a fallback while passing +/// the ordering preference to the `TableScan` as an optimization hint. This ensures +/// correctness even if the table provider cannot satisfy the requested ordering. +/// +/// # Supported Sort Expressions +/// +/// Currently, only simple column references are supported for pushdown because +/// table providers typically cannot optimize complex expressions in sort operations. +/// Complex expressions like `col("a") + col("b")` or function calls are not pushed down. +/// +/// # Examples +/// +/// ```text +/// Before optimization: +/// Sort: test.a ASC NULLS LAST +/// TableScan: test +/// +/// After optimization: +/// Sort: test.a ASC NULLS LAST -- Preserved as fallback +/// TableScan: test -- Now includes preferred_ordering hint +/// ``` +#[derive(Default, Debug)] +pub struct PushDownSort {} + +impl PushDownSort { + /// Creates a new instance of the `PushDownSort` optimizer rule. + /// + /// # Returns + /// + /// A new `PushDownSort` optimizer rule that can be added to the optimization pipeline. + /// + /// # Examples + /// + /// ```rust + /// use datafusion_optimizer::push_down_sort::PushDownSort; + /// + /// let rule = PushDownSort::new(); + /// ``` + pub fn new() -> Self { + Self {} + } + + /// Checks if a sort expression can be pushed down to a table scan. + /// + /// Currently, we only support pushing down simple column references + /// because table providers typically can't optimize complex expressions + /// in sort pushdown. + fn can_pushdown_sort_expr(expr: &SortExpr) -> bool { + // Only push down simple column references + matches!(expr.expr, Expr::Column(_)) + } + + /// Checks if all sort expressions in a list can be pushed down. + fn can_pushdown_sort_exprs(sort_exprs: &[SortExpr]) -> bool { + sort_exprs.iter().all(Self::can_pushdown_sort_expr) + } +} + +impl OptimizerRule for PushDownSort { + fn supports_rewrite(&self) -> bool { + true + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + // Look for Sort -> TableScan pattern + let LogicalPlan::Sort(sort) = &plan else { + return Ok(Transformed::no(plan)); + }; + + let LogicalPlan::TableScan(table_scan) = sort.input.as_ref() else { + return Ok(Transformed::no(plan)); + }; + + // Check if we can push down the sort expressions + if !Self::can_pushdown_sort_exprs(&sort.expr) { + return Ok(Transformed::no(plan)); + } + + // Create new TableScan with preferred ordering + let new_table_scan = table_scan.clone().with_ordering( + ScanOrdering::default().with_preferred_ordering(sort.expr.clone()), + ); + + // Preserve the Sort node as a fallback while passing the ordering + // preference to the TableScan as an optimization hint + let new_sort = datafusion_expr::logical_plan::Sort { + expr: sort.expr.clone(), + input: Arc::new(LogicalPlan::TableScan(new_table_scan)), + fetch: sort.fetch, + }; + let new_plan = LogicalPlan::Sort(new_sort); + + Ok(Transformed::yes(new_plan)) + } + + fn name(&self) -> &str { + "push_down_sort" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::test_table_scan; + use crate::{assert_optimized_plan_eq_snapshot, OptimizerContext}; + use datafusion_common::{Column, Result}; + use datafusion_expr::{col, lit, Expr, JoinType, LogicalPlanBuilder, SortExpr}; + use std::sync::Arc; + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let optimizer_ctx = OptimizerContext::new().with_max_passes(1); + let rules: Vec> = vec![Arc::new(PushDownSort::new())]; + assert_optimized_plan_eq_snapshot!( + optimizer_ctx, + rules, + $plan, + @ $expected, + ) + }}; + } + + #[test] + fn test_can_pushdown_sort_expr() { + // Simple column reference should be pushable + let sort_expr = SortExpr::new(col("a"), true, false); + assert!(PushDownSort::can_pushdown_sort_expr(&sort_expr)); + + // Complex expression should not be pushable + let sort_expr = SortExpr::new(col("a") + col("b"), true, false); + assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + + // Function call should not be pushable + let sort_expr = SortExpr::new(col("c").like(lit("test%")), true, false); + assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + + // Literal should not be pushable + let sort_expr = SortExpr::new(lit(42), true, false); + assert!(!PushDownSort::can_pushdown_sort_expr(&sort_expr)); + } + + #[test] + fn test_can_pushdown_sort_exprs() { + // All simple columns should be pushable + let sort_exprs = vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("b"), false, true), + ]; + assert!(PushDownSort::can_pushdown_sort_exprs(&sort_exprs)); + + // Mix of simple and complex should not be pushable + let sort_exprs = vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("a") + col("b"), false, true), + ]; + assert!(!PushDownSort::can_pushdown_sort_exprs(&sort_exprs)); + + // Empty list should be pushable + let sort_exprs = vec![]; + assert!(PushDownSort::can_pushdown_sort_exprs(&sort_exprs)); + } + + #[test] + fn test_basic_sort_pushdown_to_table_scan() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort node is preserved with preferred_ordering passed to TableScan + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + TableScan: test + " + ) + } + + #[test] + fn test_multiple_column_sort_pushdown() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("b"), false, true), + ])? + .build()?; + + // Multi-column sort is preserved with preferred_ordering passed to TableScan + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.b DESC NULLS FIRST + TableScan: test + " + ) + } + + #[test] + fn test_sort_node_preserved_with_preferred_ordering() -> Result<()> { + let rule = PushDownSort::new(); + let table_scan = test_table_scan()?; + let sort_plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + let config = &OptimizerContext::new(); + let result = rule.rewrite(sort_plan, config)?; + + // Verify Sort node is preserved + match &result.data { + LogicalPlan::Sort(sort) => { + // Check that TableScan has preferred_ordering + if let LogicalPlan::TableScan(ts) = sort.input.as_ref() { + assert!(ts.ordering.is_some()); + } else { + panic!("Expected TableScan input"); + } + } + _ => panic!("Expected Sort node to be preserved"), + } + + Ok(()) + } + + #[test] + fn test_no_pushdown_with_complex_expressions() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), + SortExpr::new(col("a") + col("b"), false, true), // Complex expression + ])? + .build()?; + + // Sort should remain unchanged + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.a + test.b DESC NULLS FIRST + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above projection + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Projection: test.a, test.b + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("a").gt(lit(10)))? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above filter + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Filter: test.a > Int32(10) + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_aggregate() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], Vec::::new())? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above aggregate + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Aggregate: groupBy=[[test.a]], aggr=[[]] + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_join() -> Result<()> { + let left_table = crate::test::test_table_scan_with_name("t1")?; + let right_table = crate::test::test_table_scan_with_name("t2")?; + + let plan = LogicalPlanBuilder::from(left_table) + .join( + right_table, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .sort(vec![SortExpr::new( + Expr::Column(Column::new(Some("t1"), "a")), + true, + false, + )])? + .build()?; + + // Sort should remain above join + assert_optimized_plan_equal!( + plan, + @ r" + Sort: t1.a ASC NULLS LAST + Inner Join: t1.a = t2.a + TableScan: t1 + TableScan: t2 + " + ) + } + + #[test] + fn test_no_pushdown_through_limit() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .limit(0, Some(10))? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above limit + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Limit: skip=0, fetch=10 + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_through_distinct() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .distinct()? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort should remain above distinct + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Distinct: + TableScan: test + " + ) + } + + #[test] + fn test_no_pushdown_on_non_sort_nodes() -> Result<()> { + let table_scan = test_table_scan()?; + + // TableScan should remain unchanged + assert_optimized_plan_equal!( + table_scan, + @ r"TableScan: test" + ) + } + + // Tests for node types that currently block sort pushdown + + #[test] + fn test_potential_pushdown_through_subquery_alias() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .alias("aliased_table")? + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort remains above SubqueryAlias + assert_optimized_plan_equal!( + plan, + @ r" + Sort: aliased_table.a ASC NULLS LAST + SubqueryAlias: aliased_table + TableScan: test + " + ) + } + + #[test] + fn test_potential_pushdown_through_order_preserving_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b"), col("c")])? // Identity projection - doesn't change column order + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Sort remains above Projection (conservative approach) + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Projection: test.a, test.b, test.c + TableScan: test + " + ) + } + + #[test] + fn test_potential_pushdown_through_order_preserving_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("b").gt(lit(0)))? // Filter on different column than sort + .sort(vec![SortExpr::new(col("a"), true, false)])? + .build()?; + + // Currently: Sort remains above Filter (conservative approach) + // Future enhancement: Could push through filters that don't affect sort column relationships + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + Filter: test.b > Int32(0) + TableScan: test + " + ) + } + + #[test] + fn test_edge_case_empty_sort_expressions() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(Vec::::new())? // Empty sort + .build()?; + + // Empty sort is preserved + assert_optimized_plan_equal!( + plan, + @ r" + Sort: + TableScan: test + " + ) + } + + #[test] + fn test_sort_with_nulls_first_last_variants() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), // ASC NULLS LAST + SortExpr::new(col("b"), true, true), // ASC NULLS FIRST + SortExpr::new(col("c"), false, false), // DESC NULLS LAST + ])? + .build()?; + + // All variants of nulls ordering should be pushable for simple columns + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.b ASC NULLS FIRST, test.c DESC NULLS LAST + TableScan: test + " + ) + } + + #[test] + fn test_mixed_simple_and_qualified_columns() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + SortExpr::new(col("a"), true, false), // Simple column + SortExpr::new(Expr::Column(Column::new(Some("test"), "b")), false, true), // Qualified column + ])? + .build()?; + + // Both simple and qualified column references should be pushable + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST, test.b DESC NULLS FIRST + TableScan: test + " + ) + } + + #[test] + fn test_case_sensitive_column_references() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![SortExpr::new(col("A"), true, false)])? // Capital A + .build()?; + + // Column reference case sensitivity should be handled by the schema + assert_optimized_plan_equal!( + plan, + @ r" + Sort: test.a ASC NULLS LAST + TableScan: test + " + ) + } +} diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index cc3e805ed1df..960a3a65c3a2 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -271,6 +271,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + ordering: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 06965ebef0f7..a686795df129 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -194,6 +194,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE @@ -216,6 +217,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE