Skip to content

Commit 3a41cc6

Browse files
zhuqi-lucasCopilotadriangbclaude
authored
Establish the high level API for sort pushdown and the optimizer rule and support reverse files and row groups (#19064)
## Which issue does this PR close? Establish the high level API for sort pushdown and the optimizer rule. Only re-arrange files and row groups and return Inexact, now support reverse order case, and we don't need to cache anything for this implementation, so it's no memory overhead. It will have huge performance improvement with dynamic topk pushdown to skip row groups. Details: Performance results on ClickBench sorted data: 13ms vs 300ms baseline (23x faster), close to aggressive caching approach (9.8ms) but with much better memory stability [details](#18817 (comment)) - Closes #19059 - Closes #10433 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Adrian Garcia Badaracco <[email protected]> Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 50d20dd commit 3a41cc6

File tree

29 files changed

+2670
-29
lines changed

29 files changed

+2670
-29
lines changed

datafusion/common/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,21 @@ config_namespace! {
10791079
/// then the output will be coerced to a non-view.
10801080
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
10811081
pub expand_views_at_output: bool, default = false
1082+
1083+
/// Enable sort pushdown optimization.
1084+
/// When enabled, attempts to push sort requirements down to data sources
1085+
/// that can natively handle them (e.g., by reversing file/row group read order).
1086+
///
1087+
/// Returns **inexact ordering**: Sort operator is kept for correctness,
1088+
/// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N),
1089+
/// providing significant speedup.
1090+
///
1091+
/// Memory: No additional overhead (only changes read order).
1092+
///
1093+
/// Future: Will add option to detect perfectly sorted data and eliminate Sort completely.
1094+
///
1095+
/// Default: true
1096+
pub enable_sort_pushdown: bool, default = true
10821097
}
10831098
}
10841099

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod limit_pushdown;
3232
mod limited_distinct_aggregation;
3333
mod partition_statistics;
3434
mod projection_pushdown;
35+
mod pushdown_sort;
3536
mod replace_with_order_preserving_variants;
3637
mod sanity_checker;
3738
#[expect(clippy::needless_pass_by_value)]

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 672 additions & 0 deletions
Large diffs are not rendered by default.

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Test utilities for physical optimizer tests
1919
2020
use std::any::Any;
21-
use std::fmt::Formatter;
21+
use std::fmt::{Display, Formatter};
2222
use std::sync::{Arc, LazyLock};
2323

2424
use arrow::array::Int32Array;
@@ -33,7 +33,9 @@ use datafusion_common::config::ConfigOptions;
3333
use datafusion_common::stats::Precision;
3434
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3535
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
36-
use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result, Statistics};
36+
use datafusion_common::{
37+
internal_err, ColumnStatistics, JoinType, NullEquality, Result, Statistics,
38+
};
3739
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3840
use datafusion_execution::object_store::ObjectStoreUrl;
3941
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -700,3 +702,75 @@ impl TestAggregate {
700702
}
701703
}
702704
}
705+
706+
/// A harness for testing physical optimizers.
707+
#[derive(Debug)]
708+
pub struct OptimizationTest {
709+
input: Vec<String>,
710+
output: Result<Vec<String>, String>,
711+
}
712+
713+
impl OptimizationTest {
714+
pub fn new<O>(
715+
input_plan: Arc<dyn ExecutionPlan>,
716+
opt: O,
717+
enable_sort_pushdown: bool,
718+
) -> Self
719+
where
720+
O: PhysicalOptimizerRule,
721+
{
722+
let input = format_execution_plan(&input_plan);
723+
let input_schema = input_plan.schema();
724+
725+
let mut config = ConfigOptions::new();
726+
config.optimizer.enable_sort_pushdown = enable_sort_pushdown;
727+
let output_result = opt.optimize(input_plan, &config);
728+
let output = output_result
729+
.and_then(|plan| {
730+
if opt.schema_check() && (plan.schema() != input_schema) {
731+
internal_err!(
732+
"Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}",
733+
input_schema,
734+
plan.schema()
735+
)
736+
} else {
737+
Ok(plan)
738+
}
739+
})
740+
.map(|plan| format_execution_plan(&plan))
741+
.map_err(|e| e.to_string());
742+
743+
Self { input, output }
744+
}
745+
}
746+
747+
impl Display for OptimizationTest {
748+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
749+
writeln!(f, "OptimizationTest:")?;
750+
writeln!(f, " input:")?;
751+
for line in &self.input {
752+
writeln!(f, " - {line}")?;
753+
}
754+
writeln!(f, " output:")?;
755+
match &self.output {
756+
Ok(output) => {
757+
writeln!(f, " Ok:")?;
758+
for line in output {
759+
writeln!(f, " - {line}")?;
760+
}
761+
}
762+
Err(err) => {
763+
writeln!(f, " Err: {err}")?;
764+
}
765+
}
766+
Ok(())
767+
}
768+
}
769+
770+
pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
771+
format_lines(&displayable(plan.as_ref()).indent(false).to_string())
772+
}
773+
774+
fn format_lines(s: &str) -> Vec<String> {
775+
s.trim().split('\n').map(|s| s.to_string()).collect()
776+
}

datafusion/datasource-parquet/src/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod page_filter;
3030
mod reader;
3131
mod row_filter;
3232
mod row_group_filter;
33+
mod sort;
3334
pub mod source;
3435
mod writer;
3536

0 commit comments

Comments
 (0)