Skip to content

Commit b9303ee

Browse files
committed
fix(cubesql): Fix SortPushDown pushing sort over joins
LogicalPlan::Join and CrossJoin do not preserve the ordering semantically When planned as HashJoin it will output batches in same order as they are coming from right stream But both Join and CrossJoin will have same partitioning as right input (even when repartition_joins disabled), and these partitions can be collected in arbitrary order by CoalescePartitions Also, Substrait says that for both Join and Cross Product > Orderedness is empty post operation See https://substrait.io/relations/logical_relations/#join-operation
1 parent abd4724 commit b9303ee

File tree

3 files changed

+10
-103
lines changed

3 files changed

+10
-103
lines changed

rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__sort_push_down__tests__sort_down_cross_join_sort_left.snap

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ source: cubesql/src/compile/engine/df/optimizers/sort_push_down.rs
33
expression: optimize(&plan)
44
---
55
Projection: #j1.c1, #j2.c2
6-
CrossJoin:
7-
Sort: #j1.c1 ASC NULLS LAST
6+
Sort: #j1.c1 ASC NULLS LAST
7+
CrossJoin:
88
Projection: #j1.key, #j1.c1
99
TableScan: j1 projection=None
10-
Projection: #j2.key, #j2.c2
11-
TableScan: j2 projection=None
10+
Projection: #j2.key, #j2.c2
11+
TableScan: j2 projection=None

rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__sort_push_down__tests__sort_down_join_sort_left.snap

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ source: cubesql/src/compile/engine/df/optimizers/sort_push_down.rs
33
expression: optimize(&plan)
44
---
55
Projection: #j1.c1, #j2.c2
6-
Inner Join: #j1.key = #j2.key
7-
Sort: #j1.c1 ASC NULLS LAST
6+
Sort: #j1.c1 ASC NULLS LAST
7+
Inner Join: #j1.key = #j2.key
88
Projection: #j1.key, #j1.c1
99
TableScan: j1 projection=None
10-
Projection: #j2.key, #j2.c2
11-
TableScan: j2 projection=None
10+
Projection: #j2.key, #j2.c2
11+
TableScan: j2 projection=None

rust/cubesql/cubesql/src/compile/engine/df/optimizers/sort_push_down.rs

Lines changed: 2 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@ use std::{collections::HashMap, sync::Arc};
33
use datafusion::{
44
error::{DataFusionError, Result},
55
logical_plan::{
6-
plan::{
7-
Aggregate, CrossJoin, Distinct, Join, Limit, Projection, Sort, Subquery, Union, Window,
8-
},
6+
plan::{Aggregate, Distinct, Limit, Projection, Sort, Subquery, Union, Window},
97
Column, DFSchema, Expr, Filter, LogicalPlan,
108
},
119
optimizer::optimizer::{OptimizerConfig, OptimizerRule},
1210
};
1311

14-
use super::utils::{get_schema_columns, is_column_expr, plan_has_projections, rewrite};
12+
use super::utils::{is_column_expr, plan_has_projections, rewrite};
1513

1614
/// Sort Push Down optimizer rule pushes ORDER BY clauses consisting of specific,
1715
/// mostly simple, expressions down the plan, all the way to the Projection
@@ -167,97 +165,6 @@ fn sort_push_down(
167165
optimizer_config,
168166
)
169167
}
170-
LogicalPlan::Join(Join {
171-
left,
172-
right,
173-
on,
174-
join_type,
175-
join_constraint,
176-
schema,
177-
null_equals_null,
178-
}) => {
179-
// DataFusion preserves the sorting of the joined plans, prioritizing left side.
180-
// Taking this into account, we can push Sort down the left plan if Sort references
181-
// columns just from the left side.
182-
// TODO: check if this is still the case with multiple target partitions
183-
if let Some(some_sort_expr) = &sort_expr {
184-
let left_columns = get_schema_columns(left.schema());
185-
if some_sort_expr.iter().all(|expr| {
186-
if let Expr::Sort { expr, .. } = expr {
187-
if let Expr::Column(column) = expr.as_ref() {
188-
return left_columns.contains(column);
189-
}
190-
}
191-
false
192-
}) {
193-
return Ok(LogicalPlan::Join(Join {
194-
left: Arc::new(sort_push_down(
195-
optimizer,
196-
left,
197-
sort_expr,
198-
optimizer_config,
199-
)?),
200-
right: Arc::new(sort_push_down(optimizer, right, None, optimizer_config)?),
201-
on: on.clone(),
202-
join_type: *join_type,
203-
join_constraint: *join_constraint,
204-
schema: schema.clone(),
205-
null_equals_null: *null_equals_null,
206-
}));
207-
}
208-
}
209-
210-
issue_sort(
211-
sort_expr,
212-
LogicalPlan::Join(Join {
213-
left: Arc::new(sort_push_down(optimizer, left, None, optimizer_config)?),
214-
right: Arc::new(sort_push_down(optimizer, right, None, optimizer_config)?),
215-
on: on.clone(),
216-
join_type: *join_type,
217-
join_constraint: *join_constraint,
218-
schema: schema.clone(),
219-
null_equals_null: *null_equals_null,
220-
}),
221-
)
222-
}
223-
LogicalPlan::CrossJoin(CrossJoin {
224-
left,
225-
right,
226-
schema,
227-
}) => {
228-
// See `LogicalPlan::Join` notes above.
229-
if let Some(some_sort_expr) = &sort_expr {
230-
let left_columns = get_schema_columns(left.schema());
231-
if some_sort_expr.iter().all(|expr| {
232-
if let Expr::Sort { expr, .. } = expr {
233-
if let Expr::Column(column) = expr.as_ref() {
234-
return left_columns.contains(column);
235-
}
236-
}
237-
false
238-
}) {
239-
return Ok(LogicalPlan::CrossJoin(CrossJoin {
240-
left: Arc::new(sort_push_down(
241-
optimizer,
242-
left,
243-
sort_expr,
244-
optimizer_config,
245-
)?),
246-
right: Arc::new(sort_push_down(optimizer, right, None, optimizer_config)?),
247-
schema: schema.clone(),
248-
}));
249-
}
250-
}
251-
252-
issue_sort(
253-
sort_expr,
254-
LogicalPlan::CrossJoin(CrossJoin {
255-
left: Arc::new(sort_push_down(optimizer, left, None, optimizer_config)?),
256-
right: Arc::new(sort_push_down(optimizer, right, None, optimizer_config)?),
257-
schema: schema.clone(),
258-
}),
259-
)
260-
}
261168
LogicalPlan::Union(Union {
262169
inputs,
263170
schema,

0 commit comments

Comments
 (0)