Skip to content

Commit 320fc75

Browse files
authored
Fix TopN row size estimate (elastic#119476) (elastic#119544)
Currently, we only account for attributes after the TopN operation when estimating its row size. This results in invalid estimates for the final TopN on the coordinator and the node-level reduction TopN. This change corrects the estimates and removes the workaround, paving the way for enabling node-level reduction in ES|QL. Closes elastic#106956
1 parent b48beb3 commit 320fc75

File tree

5 files changed

+45
-25
lines changed

5 files changed

+45
-25
lines changed

docs/changelog/119476.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 119476
2+
summary: Fix TopN row size estimate
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 106956

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EstimatesRowSize.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ public String toString() {
103103

104104
static int estimateSize(DataType dataType) {
105105
ElementType elementType = PlannerUtils.toElementType(dataType);
106-
if (elementType == ElementType.DOC) {
107-
throw new EsqlIllegalArgumentException("can't load a [doc] with field extraction");
108-
}
109106
if (elementType == ElementType.UNKNOWN) {
110107
throw new EsqlIllegalArgumentException("[unknown] can't be the result of field extraction");
111108
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TopNExec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1314
import org.elasticsearch.xpack.esql.core.expression.Expression;
1415
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1516
import org.elasticsearch.xpack.esql.core.tree.Source;
17+
import org.elasticsearch.xpack.esql.core.type.DataType;
1618
import org.elasticsearch.xpack.esql.expression.Order;
1719
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
1820

@@ -95,6 +97,9 @@ public Integer estimatedRowSize() {
9597

9698
@Override
9799
public PhysicalPlan estimateRowSize(State state) {
100+
final List<Attribute> output = output();
101+
final boolean needsSortedDocIds = output.stream().anyMatch(a -> a.dataType() == DataType.DOC_DATA_TYPE);
102+
state.add(needsSortedDocIds, output);
98103
int size = state.consumeAllFields(true);
99104
return Objects.equals(this.estimatedRowSize, size) ? this : new TopNExec(source(), child(), order, limit, size);
100105
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource,
345345
}
346346

347347
private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerContext context) {
348+
final Integer rowSize = topNExec.estimatedRowSize();
349+
assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
348350
PhysicalOperation source = plan(topNExec.child(), context);
349351

350352
ElementType[] elementTypes = new ElementType[source.layout.numberOfChannels()];
@@ -385,24 +387,8 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
385387
} else {
386388
throw new EsqlIllegalArgumentException("limit only supported with literal values");
387389
}
388-
389-
// TODO Replace page size with passing estimatedRowSize down
390-
/*
391-
* The 2000 below is a hack to account for incoming size and to make
392-
* sure the estimated row size is never 0 which'd cause a divide by 0.
393-
* But we should replace this with passing the estimate into the real
394-
* topn and letting it actually measure the size of rows it produces.
395-
* That'll be more accurate. And we don't have a path for estimating
396-
* incoming rows. And we don't need one because we can estimate.
397-
*/
398390
return source.with(
399-
new TopNOperatorFactory(
400-
limit,
401-
asList(elementTypes),
402-
asList(encoders),
403-
orders,
404-
context.pageSize(2000 + topNExec.estimatedRowSize())
405-
),
391+
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(rowSize)),
406392
source.layout
407393
);
408394
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
import java.util.HashMap;
144144
import java.util.HashSet;
145145
import java.util.List;
146+
import java.util.Locale;
146147
import java.util.Map;
147148
import java.util.Objects;
148149
import java.util.Set;
@@ -920,16 +921,16 @@ public void testQueryWithNull() {
920921

921922
var optimized = optimizedPlan(plan);
922923
var topN = as(optimized, TopNExec.class);
923-
// no fields are added after the top n - so 0 here
924-
assertThat(topN.estimatedRowSize(), equalTo(0));
924+
// all fields + nullsum are loaded in the final TopN
925+
assertThat(topN.estimatedRowSize(), equalTo(allFieldRowSize + Integer.BYTES));
925926

926927
var exchange = asRemoteExchange(topN.child());
927928
var project = as(exchange.child(), ProjectExec.class);
928929
var extract = as(project.child(), FieldExtractExec.class);
929930
var eval = as(extract.child(), EvalExec.class);
930931
var source = source(eval.child());
931932
// All fields loaded
932-
assertThat(source.estimatedRowSize(), equalTo(allFieldRowSize + 3 * Integer.BYTES + Long.BYTES));
933+
assertThat(source.estimatedRowSize(), equalTo(allFieldRowSize + 3 * Integer.BYTES + 2 * Integer.BYTES));
933934
}
934935

935936
public void testPushAndInequalitiesFilter() {
@@ -1141,8 +1142,8 @@ public void testExtractorForEvalWithoutProject() throws Exception {
11411142
var project = as(exchange.child(), ProjectExec.class);
11421143
var extract = as(project.child(), FieldExtractExec.class);
11431144
var topNLocal = as(extract.child(), TopNExec.class);
1144-
// two extra ints for forwards and backwards map
1145-
assertThat(topNLocal.estimatedRowSize(), equalTo(allFieldRowSize + Integer.BYTES * 2));
1145+
// all fields plus nullsum and shards, segments, docs and two extra ints for forwards and backwards map
1146+
assertThat(topNLocal.estimatedRowSize(), equalTo(allFieldRowSize + Integer.BYTES + Integer.BYTES * 2 + Integer.BYTES * 3));
11461147

11471148
var eval = as(topNLocal.child(), EvalExec.class);
11481149
var source = source(eval.child());
@@ -7553,6 +7554,31 @@ public void testScoreTopN() {
75537554
assertTrue(esRelation.output().stream().anyMatch(a -> a.name().equals(MetadataAttribute.SCORE) && a instanceof MetadataAttribute));
75547555
}
75557556

7557+
public void testReductionPlanForTopN() {
7558+
int limit = between(1, 100);
7559+
var plan = physicalPlan(String.format(Locale.ROOT, """
7560+
FROM test
7561+
| sort emp_no
7562+
| LIMIT %d
7563+
""", limit));
7564+
Tuple<PhysicalPlan, PhysicalPlan> plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(plan, config);
7565+
PhysicalPlan reduction = PlannerUtils.reductionPlan(plans.v2());
7566+
TopNExec reductionTopN = as(reduction, TopNExec.class);
7567+
assertThat(reductionTopN.estimatedRowSize(), equalTo(allFieldRowSize));
7568+
assertThat(reductionTopN.limit().fold(), equalTo(limit));
7569+
}
7570+
7571+
public void testReductionPlanForAggs() {
7572+
var plan = physicalPlan("""
7573+
FROM test
7574+
| stats x = sum(salary) BY first_name
7575+
""");
7576+
Tuple<PhysicalPlan, PhysicalPlan> plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(plan, config);
7577+
PhysicalPlan reduction = PlannerUtils.reductionPlan(plans.v2());
7578+
AggregateExec reductionAggs = as(reduction, AggregateExec.class);
7579+
assertThat(reductionAggs.estimatedRowSize(), equalTo(58)); // double and keyword
7580+
}
7581+
75567582
@SuppressWarnings("SameParameterValue")
75577583
private static void assertFilterCondition(
75587584
Filter filter,

0 commit comments

Comments
 (0)