Skip to content

Commit d44f2a0

Browse files
committed
perf(cube): logical plan optimization: combine Limit(fetch)>Sort into Sort(fetch) in one pass
1 parent 20d3495 commit d44f2a0

File tree

2 files changed

+52
-30
lines changed

2 files changed

+52
-30
lines changed

datafusion/core/tests/user_defined/user_defined_plan.rs

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ use datafusion::{
7979
runtime_env::RuntimeEnv,
8080
},
8181
logical_expr::{
82-
Expr, Extension, LogicalPlan, Sort, UserDefinedLogicalNode,
83-
UserDefinedLogicalNodeCore,
82+
Expr, Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
8483
},
8584
optimizer::{OptimizerConfig, OptimizerRule},
8685
physical_expr::EquivalenceProperties,
@@ -519,30 +518,49 @@ impl OptimizerRule for TopKOptimizerRule {
519518
// Note: this code simply looks for the pattern of a Limit followed by a
520519
// Sort and replaces it by a TopK node. It does not handle many
521520
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
522-
let LogicalPlan::Limit(ref limit) = plan else {
523-
return Ok(Transformed::no(plan));
524-
};
525-
let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else {
526-
return Ok(Transformed::no(plan));
527-
};
528-
529-
if let LogicalPlan::Sort(Sort {
530-
ref expr,
531-
ref input,
532-
..
533-
}) = limit.input.as_ref()
534-
{
535-
if expr.len() == 1 {
536-
// we found a sort with a single sort expr, replace with a a TopK
537-
return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
538-
node: Arc::new(TopKPlanNode {
539-
k: fetch,
540-
input: input.as_ref().clone(),
541-
expr: expr[0].clone(),
542-
invariant_mock: self.invariant_mock.clone(),
543-
}),
544-
})));
521+
//
522+
// Because Limit > Sort can get optimized to Sort, fetch in the first pass, we
523+
// also support bare Sort (as we are running in ApplyOrder::TopDown mode).
524+
let expr: &Vec<SortExpr>;
525+
let input: &Arc<LogicalPlan>;
526+
let fetch: usize;
527+
match plan {
528+
LogicalPlan::Limit(ref limit) => {
529+
let FetchType::Literal(Some(limit_fetch)) = limit.get_fetch_type()?
530+
else {
531+
return Ok(Transformed::no(plan));
532+
};
533+
let LogicalPlan::Sort(ref sort) = limit.input.as_ref() else {
534+
return Ok(Transformed::no(plan));
535+
};
536+
expr = &sort.expr;
537+
input = &sort.input;
538+
fetch = limit_fetch;
539+
}
540+
LogicalPlan::Sort(ref sort) => {
541+
let Some(sort_fetch) = sort.fetch else {
542+
return Ok(Transformed::no(plan));
543+
};
544+
545+
expr = &sort.expr;
546+
input = &sort.input;
547+
fetch = sort_fetch;
545548
}
549+
_ => {
550+
return Ok(Transformed::no(plan));
551+
}
552+
}
553+
554+
if expr.len() == 1 {
555+
// we found a sort with a single sort expr, replace with a a TopK
556+
return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
557+
node: Arc::new(TopKPlanNode {
558+
k: fetch,
559+
input: input.as_ref().clone(),
560+
expr: expr[0].clone(),
561+
invariant_mock: self.invariant_mock.clone(),
562+
}),
563+
})));
546564
}
547565

548566
Ok(Transformed::no(plan))

datafusion/optimizer/src/push_down_limit.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,13 @@ impl OptimizerRule for PushDownLimit {
136136
}
137137
} else {
138138
sort.fetch = new_fetch;
139-
limit.input = Arc::new(LogicalPlan::Sort(sort));
140-
Ok(Transformed::yes(LogicalPlan::Limit(limit)))
139+
let new_plan = if skip > 0 {
140+
limit.input = Arc::new(LogicalPlan::Sort(sort));
141+
LogicalPlan::Limit(limit)
142+
} else {
143+
LogicalPlan::Sort(sort)
144+
};
145+
Ok(Transformed::yes(new_plan))
141146
}
142147
}
143148
LogicalPlan::Projection(mut proj) => {
@@ -591,9 +596,8 @@ mod test {
591596
.build()?;
592597

593598
// Should push down limit to sort
594-
let expected = "Limit: skip=0, fetch=10\
595-
\n Sort: test.a ASC NULLS LAST, fetch=10\
596-
\n TableScan: test";
599+
let expected = "Sort: test.a ASC NULLS LAST, fetch=10\
600+
\n TableScan: test";
597601

598602
assert_optimized_plan_equal(plan, expected)
599603
}

0 commit comments

Comments
 (0)