@@ -3,15 +3,13 @@ use std::{collections::HashMap, sync::Arc};
33use datafusion:: {
44 error:: { DataFusionError , Result } ,
55 logical_plan:: {
6- plan:: {
7- Aggregate , CrossJoin , Distinct , Join , Limit , Projection , Sort , Subquery , Union , Window ,
8- } ,
6+ plan:: { Aggregate , Distinct , Limit , Projection , Sort , Subquery , Union , Window } ,
97 Column , DFSchema , Expr , Filter , LogicalPlan ,
108 } ,
119 optimizer:: optimizer:: { OptimizerConfig , OptimizerRule } ,
1210} ;
1311
14- use super :: utils:: { get_schema_columns , is_column_expr, plan_has_projections, rewrite} ;
12+ use super :: utils:: { is_column_expr, plan_has_projections, rewrite} ;
1513
1614/// Sort Push Down optimizer rule pushes ORDER BY clauses consisting of specific,
1715/// mostly simple, expressions down the plan, all the way to the Projection
@@ -167,97 +165,6 @@ fn sort_push_down(
167165 optimizer_config,
168166 )
169167 }
170- LogicalPlan :: Join ( Join {
171- left,
172- right,
173- on,
174- join_type,
175- join_constraint,
176- schema,
177- null_equals_null,
178- } ) => {
179- // DataFusion preserves the sorting of the joined plans, prioritizing left side.
180- // Taking this into account, we can push Sort down the left plan if Sort references
181- // columns just from the left side.
182- // TODO: check if this is still the case with multiple target partitions
183- if let Some ( some_sort_expr) = & sort_expr {
184- let left_columns = get_schema_columns ( left. schema ( ) ) ;
185- if some_sort_expr. iter ( ) . all ( |expr| {
186- if let Expr :: Sort { expr, .. } = expr {
187- if let Expr :: Column ( column) = expr. as_ref ( ) {
188- return left_columns. contains ( column) ;
189- }
190- }
191- false
192- } ) {
193- return Ok ( LogicalPlan :: Join ( Join {
194- left : Arc :: new ( sort_push_down (
195- optimizer,
196- left,
197- sort_expr,
198- optimizer_config,
199- ) ?) ,
200- right : Arc :: new ( sort_push_down ( optimizer, right, None , optimizer_config) ?) ,
201- on : on. clone ( ) ,
202- join_type : * join_type,
203- join_constraint : * join_constraint,
204- schema : schema. clone ( ) ,
205- null_equals_null : * null_equals_null,
206- } ) ) ;
207- }
208- }
209-
210- issue_sort (
211- sort_expr,
212- LogicalPlan :: Join ( Join {
213- left : Arc :: new ( sort_push_down ( optimizer, left, None , optimizer_config) ?) ,
214- right : Arc :: new ( sort_push_down ( optimizer, right, None , optimizer_config) ?) ,
215- on : on. clone ( ) ,
216- join_type : * join_type,
217- join_constraint : * join_constraint,
218- schema : schema. clone ( ) ,
219- null_equals_null : * null_equals_null,
220- } ) ,
221- )
222- }
223- LogicalPlan :: CrossJoin ( CrossJoin {
224- left,
225- right,
226- schema,
227- } ) => {
228- // See `LogicalPlan::Join` notes above.
229- if let Some ( some_sort_expr) = & sort_expr {
230- let left_columns = get_schema_columns ( left. schema ( ) ) ;
231- if some_sort_expr. iter ( ) . all ( |expr| {
232- if let Expr :: Sort { expr, .. } = expr {
233- if let Expr :: Column ( column) = expr. as_ref ( ) {
234- return left_columns. contains ( column) ;
235- }
236- }
237- false
238- } ) {
239- return Ok ( LogicalPlan :: CrossJoin ( CrossJoin {
240- left : Arc :: new ( sort_push_down (
241- optimizer,
242- left,
243- sort_expr,
244- optimizer_config,
245- ) ?) ,
246- right : Arc :: new ( sort_push_down ( optimizer, right, None , optimizer_config) ?) ,
247- schema : schema. clone ( ) ,
248- } ) ) ;
249- }
250- }
251-
252- issue_sort (
253- sort_expr,
254- LogicalPlan :: CrossJoin ( CrossJoin {
255- left : Arc :: new ( sort_push_down ( optimizer, left, None , optimizer_config) ?) ,
256- right : Arc :: new ( sort_push_down ( optimizer, right, None , optimizer_config) ?) ,
257- schema : schema. clone ( ) ,
258- } ) ,
259- )
260- }
261168 LogicalPlan :: Union ( Union {
262169 inputs,
263170 schema,
0 commit comments