Skip to content
6 changes: 6 additions & 0 deletions docs/changelog/131833.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131833
summary: "ESQL: Remove redundant `TopN`"
area: ES|QL
type: enhancement
issues:
- 131233
158 changes: 158 additions & 0 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/topN.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,161 @@ FROM employees
avg_worked_seconds:long | birth_date:date | emp_no:i | gender:k | height:d | height.float:d | height.half_float:d | height.scaled_float:d | hire_date:date | is_rehired:bool | job_positions:k | languages:i | languages.byte:i | languages.long:l | languages.short:short | salary:i | salary_change:d | salary_change.int:i | salary_change.keyword:k | salary_change.long:l | still_hired:bool | name:k | first_name:k | last_name:k
349086555 | 1961-09-01T00:00:00Z | 10056 | F | 1.57 | 1.5700000524520874 | 1.5703125 | 1.57 | 1990-02-01T00:00:00Z | [false, false, true] | [Senior Team Lead] | 2 | 2 | 2 | 2 | 33370 | [-5.17, 10.99] | [-5, 10] | [-5.17, 10.99] | [-5, 10] | true | Brendon | Bernini | Brendon
;


redundantTopNBasic
from employees
| sort emp_no
| limit 100
| sort emp_no
| limit 10
| keep emp_no, first_name
;

emp_no:integer|first_name:keyword
10001 |Georgi
10002 |Bezalel
10003 |Parto
10004 |Chirstian
10005 |Kyoichi
10006 |Anneke
10007 |Tzvetan
10008 |Saniya
10009 |Sumant
10010 |Duangkaew
;


redundantTopNWithFilter
from employees
| sort emp_no desc
| limit 50
| where gender == "M"
| sort emp_no desc
| limit 5
| keep emp_no, first_name, gender
;

emp_no:integer | first_name:keyword | gender:keyword
10097 | Remzi | M
10096 | Jayson | M
10095 | Hilari | M
10093 | Sailaja | M
10091 | Amabile | M
;


redundantTopNWithProject
from employees
| sort salary desc
| limit 20
| keep emp_no, first_name, salary
| sort salary desc
| limit 3
;

emp_no:integer | first_name:keyword | salary:integer
10029 | Otmar | 74999
10045 | Moss | 74970
10007 | Tzvetan | 74572
;


redundantTopNWithEval
from employees
| sort emp_no
| limit 15
| eval double_emp_no = emp_no * 2
| sort emp_no
| limit 5
| keep emp_no, first_name, double_emp_no
;

emp_no:integer|first_name:keyword|double_emp_no:integer
10001 |Georgi |20002
10002 |Bezalel |20004
10003 |Parto |20006
10004 |Chirstian |20008
10005 |Kyoichi |20010
;


redundantTopNWithRename
from employees
| sort first_name
| limit 20
| rename first_name as name
| sort name
| limit 8
| keep emp_no, name
;

emp_no:integer | name:keyword
10059 | Alejandro
10091 | Amabile
10006 | Anneke
10062 | Anoosh
10094 | Arumugam
10049 | Basil
10058 | Berhard
10014 | Berni
;


nonRedundantTopNDifferentOrder
from employees
| sort emp_no
| limit 10
| sort salary desc
| limit 5
| keep emp_no, first_name, salary
;

emp_no:integer | first_name:keyword | salary:integer
10007 | Tzvetan | 74572
10009 | Sumant | 66174
10005 | Kyoichi | 63528
10003 | Parto | 61805
10006 | Anneke | 60335
;


redundantTopNMultipleFields
from employees
| sort gender, emp_no
| limit 20
| where salary > 50000
| sort gender, emp_no
| limit 8
| keep emp_no, first_name, gender, salary
;

emp_no:integer | first_name:keyword | gender:keyword | salary:integer
10002 | Bezalel | F | 56371
10006 | Anneke | F | 60335
10007 | Tzvetan | F | 74572
10009 | Sumant | F | 66174
10024 | Suzette | F | 64675
10027 | Divier | F | 73851
10032 | null | F | 62233
10041 | Uri | F | 56415
;


redundantTopNWithStats
from employees
| sort gender
| limit 10
| stats cnt = count(*) by emp_no, gender
| sort gender
| limit 5
| keep emp_no, gender, cnt
;

emp_no:integer | gender:keyword | cnt:long
10023 | F | 1
10027 | F | 1
10006 | F | 1
10007 | F | 1
10040 | F | 1
;
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneLiteralsInOrderBy;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantOrderBy;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantSortClauses;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantTopN;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneUnusedIndexMode;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits;
Expand Down Expand Up @@ -219,6 +220,7 @@ protected static Batch<LogicalPlan> cleanup() {
return new Batch<>(
"Clean Up",
new ReplaceLimitAndSortAsTopN(),
new PruneRedundantTopN(),
new HoistRemoteEnrichTopN(),
new ReplaceRowAsLocalRelation(),
new PropgateUnmappedFields(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Insist;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.Set;

/**
* Removes redundant TopN operations from the logical plan to improve execution efficiency.
* <p>
* Multiple TopN nodes may appear in a query plan—particularly after optimization passes—
* and some of them can be safely removed if they share the same sort order and are not separated
* by operations that disrupt sorting semantics.
* <p>
* For instance:
* <pre>
* from test | sort x | limit 100 | sort x | limit 10
* </pre>
* Both <code>sort x | limit 100</code> and <code>sort x | limit 10</code> will be transformed into TopN nodes.
* Since they sort by the same key and the latter applies a stricter (or equal) limit,
* the first TopN becomes redundant and can be pruned.
*/
public class PruneRedundantTopN extends OptimizerRules.ParameterizedOptimizerRule<TopN, LogicalOptimizerContext> {

public PruneRedundantTopN() {
super(OptimizerRules.TransformDirection.DOWN);
}

@Override
protected LogicalPlan rule(TopN plan, LogicalOptimizerContext ctx) {
Set<TopN> redundant = findRedundantTopN(plan, ctx);
if (redundant.isEmpty()) {
return plan;
}
return plan.transformDown(TopN.class, topN -> redundant.contains(topN) ? topN.child() : topN);
}

/**
* breadth-first recursion to find redundant TopNs in the children tree.
* Returns an identity set (we need to compare and prune the exact instances)
*/
private Set<TopN> findRedundantTopN(TopN parentTopN, LogicalOptimizerContext ctx) {
Set<TopN> result = Collections.newSetFromMap(new IdentityHashMap<>());

Deque<LogicalPlan> toCheck = new ArrayDeque<>();
toCheck.push(parentTopN.child());

while (toCheck.isEmpty() == false) {
LogicalPlan p = toCheck.pop();
if (p instanceof TopN childTopN) {
// Check if a child TopN is redundant compared to a parent TopN.
// A child TopN is redundant if it matches the parent's sort order and has a greater or equal limit.
if (childTopN.order().equals(parentTopN.order())
// Although `PushDownAndCombineLimits` is expected to have propagated the stricter (lower) limit,
// we still compare limit values here to ensure correctness and avoid relying solely on prior optimizations.
// This limit check should always pass, but we validate it explicitly for robustness.
&& (int) parentTopN.limit().fold(ctx.foldCtx()) <= (int) childTopN.limit().fold(ctx.foldCtx())) {
result.add(childTopN);
toCheck.push(childTopN.child());
}
} else if (canRemoveRedundantChildTopN(p)) {
for (LogicalPlan child : p.children()) {
toCheck.push(child);
}
}
}
return result;
}

private boolean canRemoveRedundantChildTopN(LogicalPlan p) {
return p instanceof Completion
|| p instanceof Drop
|| p instanceof Eval
|| p instanceof Rename
|| p instanceof Filter
|| p instanceof Insist
|| p instanceof Limit
|| p instanceof OrderBy
|| p instanceof Project;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8530,6 +8530,80 @@ public void testPruneRedundantOrderBy() {
as(mvExpand2.child(), Row.class);
}

/**
* <pre>{@code
* EsqlProject[[first_name{f}#6]]
* \_TopN[[Order[first_name{f}#6,ASC,LAST]],10[INTEGER]]
* \_EsRelation[test][_meta_field{f}#11, emp_no{f}#5, first_name{f}#6, ge..]
* }</pre>
*/
public void testPruneRedundantTopN() {
var plan = optimizedPlan("""
FROM test
| KEEP first_name
| SORT first_name
| LIMIT 100
| SORT first_name
| LIMIT 10
""");
var project = as(plan, Project.class);
var topN = as(project.child(), TopN.class);
as(topN.child(), EsRelation.class);
}

/**
* <pre>{@code
* EsqlProject[[first_name{f}#9, a{r}#6]]
* \_TopN[[Order[first_name{f}#9,ASC,LAST]],10[INTEGER]]
* \_Eval[[12[INTEGER] AS a#6]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
* }</pre>
*/
public void testPruneRedundantTopNWithNodesInBetween() {
var plan = optimizedPlan("""
FROM test
| SORT first_name
| LIMIT 10
| DROP last_name
| KEEP first_name
| EVAL a = 12
| SORT first_name
| LIMIT 100
""");
var project = as(plan, Project.class);
var topN = as(project.child(), TopN.class);
var eval = as(topN.child(), Eval.class);
as(eval.child(), EsRelation.class);
}

/**
* <pre>{@code
* Project[[avg{r}#6, first_name{f}#9]]
* \_TopN[[Order[first_name{f}#9,ASC,LAST]],100[INTEGER]]
* \_Eval[[$$SUM$avg$0{r$}#19 / $$COUNT$avg$1{r$}#20 AS avg#6]]
* \_Aggregate[[first_name{f}#9],[SUM(salary{f}#13,true[BOOLEAN],compensated[KEYWORD]) AS $$SUM$avg$0#19, COUNT(salary{f}#13,t
* rue[BOOLEAN]) AS $$COUNT$avg$1#20, first_name{f}#9]]
* \_TopN[[Order[first_name{f}#9,ASC,LAST]],10[INTEGER]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
* }</pre>
*/
public void testCanNotPruneRedundantTopNWithNodesInBetween() {
var plan = optimizedPlan("""
FROM test
| SORT first_name
| LIMIT 10
| STATS avg = AVG(salary) BY first_name
| SORT first_name
| LIMIT 100
""");
var project = as(plan, Project.class);
var topN = as(project.child(), TopN.class);
var eval = as(topN.child(), Eval.class);
var agg = as(eval.child(), Aggregate.class);
var topN2 = as(agg.child(), TopN.class);
as(topN2.child(), EsRelation.class);
}

/**
* <pre>{@code
* Eval[[1[INTEGER] AS irrelevant1, 2[INTEGER] AS irrelevant2]]
Expand Down