Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131833.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131833
summary: "ESQL: Remove redundant `TopN`"
area: ES|QL
type: enhancement
issues:
- 131233
158 changes: 158 additions & 0 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/topN.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,161 @@ FROM employees
avg_worked_seconds:long | birth_date:date | emp_no:i | gender:k | height:d | height.float:d | height.half_float:d | height.scaled_float:d | hire_date:date | is_rehired:bool | job_positions:k | languages:i | languages.byte:i | languages.long:l | languages.short:short | salary:i | salary_change:d | salary_change.int:i | salary_change.keyword:k | salary_change.long:l | still_hired:bool | name:k | first_name:k | last_name:k
349086555 | 1961-09-01T00:00:00Z | 10056 | F | 1.57 | 1.5700000524520874 | 1.5703125 | 1.57 | 1990-02-01T00:00:00Z | [false, false, true] | [Senior Team Lead] | 2 | 2 | 2 | 2 | 33370 | [-5.17, 10.99] | [-5, 10] | [-5.17, 10.99] | [-5, 10] | true | Brendon | Bernini | Brendon
;


redundantTopNBasic
from employees
| sort emp_no
| limit 100
| sort emp_no
| limit 10
| keep emp_no, first_name
;

emp_no:integer|first_name:keyword
10001 |Georgi
10002 |Bezalel
10003 |Parto
10004 |Chirstian
10005 |Kyoichi
10006 |Anneke
10007 |Tzvetan
10008 |Saniya
10009 |Sumant
10010 |Duangkaew
;


redundantTopNWithFilter
from employees
| sort emp_no desc
| limit 50
| where gender == "M"
| sort emp_no desc
| limit 5
| keep emp_no, first_name, gender
;

emp_no:integer | first_name:keyword | gender:keyword
10097 | Remzi | M
10096 | Jayson | M
10095 | Hilari | M
10093 | Sailaja | M
10091 | Amabile | M
;


redundantTopNWithProject
from employees
| sort salary desc
| limit 20
| keep emp_no, first_name, salary
| sort salary desc
| limit 3
;

emp_no:integer | first_name:keyword | salary:integer
10029 | Otmar | 74999
10045 | Moss | 74970
10007 | Tzvetan | 74572
;


redundantTopNWithEval
from employees
| sort emp_no
| limit 15
| eval double_emp_no = emp_no * 2
| sort emp_no
| limit 5
| keep emp_no, first_name, double_emp_no
;

emp_no:integer|first_name:keyword|double_emp_no:integer
10001 |Georgi |20002
10002 |Bezalel |20004
10003 |Parto |20006
10004 |Chirstian |20008
10005 |Kyoichi |20010
;


redundantTopNWithRename
from employees
| sort first_name
| limit 20
| rename first_name as name
| sort name
| limit 8
| keep emp_no, name
;

emp_no:integer | name:keyword
10059 | Alejandro
10091 | Amabile
10006 | Anneke
10062 | Anoosh
10094 | Arumugam
10049 | Basil
10058 | Berhard
10014 | Berni
;


nonRedundantTopNDifferentOrder
from employees
| sort emp_no
| limit 10
| sort salary desc
| limit 5
| keep emp_no, first_name, salary
;

emp_no:integer | first_name:keyword | salary:integer
10007 | Tzvetan | 74572
10009 | Sumant | 66174
10005 | Kyoichi | 63528
10003 | Parto | 61805
10006 | Anneke | 60335
;


redundantTopNMultipleFields
from employees
| sort gender, emp_no
| limit 20
| where salary > 50000
| sort gender, emp_no
| limit 8
| keep emp_no, first_name, gender, salary
;

emp_no:integer | first_name:keyword | gender:keyword | salary:integer
10002 | Bezalel | F | 56371
10006 | Anneke | F | 60335
10007 | Tzvetan | F | 74572
10009 | Sumant | F | 66174
10024 | Suzette | F | 64675
10027 | Divier | F | 73851
10032 | null | F | 62233
10041 | Uri | F | 56415
;


redundantTopNWithStats
from employees
| sort gender
| limit 10
| stats cnt = count(*) by emp_no, gender
| sort gender
| limit 5
| keep emp_no, gender, cnt
;

emp_no:integer | gender:keyword | cnt:long
10023 | F | 1
10027 | F | 1
10006 | F | 1
10007 | F | 1
10040 | F | 1
;
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneLiteralsInOrderBy;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantOrderBy;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantSortClauses;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneRedundantTopN;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneUnusedIndexMode;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineFilters;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits;
Expand Down Expand Up @@ -221,6 +222,7 @@ protected static Batch<LogicalPlan> cleanup() {
return new Batch<>(
"Clean Up",
new ReplaceLimitAndSortAsTopN(),
new PruneRedundantTopN(),
new HoistRemoteEnrichTopN(),
new ReplaceRowAsLocalRelation(),
new PropgateUnmappedFields(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Insist;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.Set;

/**
* Removes redundant TopN operations from the logical plan to improve execution efficiency.
* <p>
* Multiple TopN nodes may appear in a query plan—particularly after optimization passes—
* and some of them can be safely removed if they share the same sort order and are not separated
* by operations that disrupt sorting semantics.
* <p>
* For instance:
* <pre>
* from test | sort x | limit 100 | sort x | limit 10
* </pre>
* Both <code>sort x | limit 100</code> and <code>sort x | limit 10</code> will be transformed into TopN nodes.
* Since they sort by the same key and the latter applies a stricter (or equal) limit,
* the first TopN becomes redundant and can be pruned.
*/
public class PruneRedundantTopN extends OptimizerRules.ParameterizedOptimizerRule<TopN, LogicalOptimizerContext> {

public PruneRedundantTopN() {
super(OptimizerRules.TransformDirection.DOWN);
}

@Override
protected LogicalPlan rule(TopN plan, LogicalOptimizerContext ctx) {
Set<TopN> redundant = findRedundantTopN(plan, ctx);
if (redundant.isEmpty()) {
return plan;
}
return plan.transformDown(TopN.class, topN -> redundant.contains(topN) ? topN.child() : topN);
}

/**
* breadth-first recursion to find redundant TopNs in the children tree.
* Returns an identity set (we need to compare and prune the exact instances)
*/
private Set<TopN> findRedundantTopN(TopN parentTopN, LogicalOptimizerContext ctx) {
Set<TopN> result = Collections.newSetFromMap(new IdentityHashMap<>());

Deque<LogicalPlan> toCheck = new ArrayDeque<>();
toCheck.push(parentTopN.child());

while (toCheck.isEmpty() == false) {
LogicalPlan p = toCheck.pop();
if (p instanceof TopN childTopN) {
// Check if a child TopN is redundant compared to a parent TopN.
// A child TopN is redundant if it matches the parent's sort order and has a greater or equal limit.
if (childTopN.order().equals(parentTopN.order())
// Although `PushDownAndCombineLimits` is expected to have propagated the stricter (lower) limit,
// we still compare limit values here to ensure correctness and avoid relying solely on prior optimizations.
// This limit check should always pass, but we validate it explicitly for robustness.
&& (int) parentTopN.limit().fold(ctx.foldCtx()) <= (int) childTopN.limit().fold(ctx.foldCtx())) {
result.add(childTopN);
toCheck.push(childTopN.child());
}
} else if (canRemoveRedundantChildTopN(p)) {
for (LogicalPlan child : p.children()) {
toCheck.push(child);
}
}
}
return result;
}

private boolean canRemoveRedundantChildTopN(LogicalPlan p) {
return p instanceof Completion
|| p instanceof Drop
|| p instanceof Eval
|| p instanceof Rename
|| p instanceof Filter
|| p instanceof Insist
|| p instanceof Limit
Copy link
Contributor

@alex-spies alex-spies Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing past limits and filters is incorrect. There is probably a class of nodes where TOP N | COMMAND | TOP M can be simplified to COMMAND | TOP min(N,M), but these cannot be included:

SORT a | LIMIT 10 | WHERE b > 10 | SORT c | LIMIT 1

vs.

LIMIT 10 | WHERE b > 10 | SORT c | LIMIT 1

The first takes the top 10 as, removes those with b <= 10 and then gives the top c from this subset.

The second takes random 10 rows, then takes those with b > 10, and finally gives the top c from this subset. But the subsets are different.

|| p instanceof OrderBy
|| p instanceof Project;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8718,6 +8718,80 @@ public void testPruneRedundantOrderBy() {
as(mvExpand2.child(), Row.class);
}

/**
* <pre>{@code
* EsqlProject[[first_name{f}#6]]
* \_TopN[[Order[first_name{f}#6,ASC,LAST]],10[INTEGER]]
* \_EsRelation[test][_meta_field{f}#11, emp_no{f}#5, first_name{f}#6, ge..]
* }</pre>
*/
public void testPruneRedundantTopN() {
var plan = optimizedPlan("""
FROM test
| KEEP first_name
| SORT first_name
| LIMIT 100
| SORT first_name
| LIMIT 10
""");
var project = as(plan, Project.class);
var topN = as(project.child(), TopN.class);
as(topN.child(), EsRelation.class);
}

/**
* <pre>{@code
* EsqlProject[[first_name{f}#9, a{r}#6]]
* \_TopN[[Order[first_name{f}#9,ASC,LAST]],10[INTEGER]]
* \_Eval[[12[INTEGER] AS a#6]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
* }</pre>
*/
public void testPruneRedundantTopNWithNodesInBetween() {
var plan = optimizedPlan("""
FROM test
| SORT first_name
| LIMIT 10
| DROP last_name
| KEEP first_name
| EVAL a = 12
| SORT first_name
| LIMIT 100
""");
var project = as(plan, Project.class);
var topN = as(project.child(), TopN.class);
var eval = as(topN.child(), Eval.class);
as(eval.child(), EsRelation.class);
}

/**
* <pre>{@code
* Project[[avg{r}#6, first_name{f}#9]]
* \_TopN[[Order[first_name{f}#9,ASC,LAST]],100[INTEGER]]
* \_Eval[[$$SUM$avg$0{r$}#19 / $$COUNT$avg$1{r$}#20 AS avg#6]]
* \_Aggregate[[first_name{f}#9],[SUM(salary{f}#13,true[BOOLEAN],compensated[KEYWORD]) AS $$SUM$avg$0#19, COUNT(salary{f}#13,t
* rue[BOOLEAN]) AS $$COUNT$avg$1#20, first_name{f}#9]]
* \_TopN[[Order[first_name{f}#9,ASC,LAST]],10[INTEGER]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
* }</pre>
*/
public void testCanNotPruneRedundantTopNWithNodesInBetween() {
var plan = optimizedPlan("""
FROM test
| SORT first_name
| LIMIT 10
| STATS avg = AVG(salary) BY first_name
| SORT first_name
| LIMIT 100
""");
var project = as(plan, Project.class);
var topN = as(project.child(), TopN.class);
var eval = as(topN.child(), Eval.class);
var agg = as(eval.child(), Aggregate.class);
var topN2 = as(agg.child(), TopN.class);
as(topN2.child(), EsRelation.class);
}

/**
* <pre>{@code
* Eval[[1[INTEGER] AS irrelevant1, 2[INTEGER] AS irrelevant2]]
Expand Down