Skip to content

Commit b284355

Browse files
nik9000gmjehovich
authored andcommitted
ESQL: Limit when we push topn to lucene (elastic#134497)
Right now we push all topn operations to lucene if possible. But Lucene was not written to handle a topn of 100,000. It's very fast, but it allocates more memory than we'd like. This limits the size of the topns that we push to lucene to the 10,000, which is the default window size limit. We'll run a regular lucene scan with our own in-engine topn instead. That's designed to scan huge numbers of documents. It doesn't have the nice min_competitive optimization. But it tracks memory very well.
1 parent 6decace commit b284355

File tree

19 files changed

+226
-31
lines changed

19 files changed

+226
-31
lines changed

docs/changelog/134497.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134497
2+
summary: Limit when we push topn to lucene
3+
area: ES|QL
4+
type: bug
5+
issues: []

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public void skipOnAborted() {
9292
*/
9393
public void testSortByManyLongsSuccess() throws IOException {
9494
initManyLongs(10);
95+
// | SORT a, b, i0, i1, ...i500 | KEEP a, b | LIMIT 10000
9596
Map<String, Object> response = sortByManyLongs(500);
9697
ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long"))
9798
.item(matchesMap().entry("name", "b").entry("type", "long"));

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class TopNOperator implements Operator, Accountable {
5555
static final class Row implements Accountable, Releasable {
5656
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Row.class);
5757

58+
private final CircuitBreaker breaker;
59+
5860
/**
5961
* The sort key.
6062
*/
@@ -81,17 +83,11 @@ static final class Row implements Accountable, Releasable {
8183
@Nullable
8284
RefCounted shardRefCounter;
8385

84-
void setShardRefCountersAndShard(RefCounted shardRefCounter) {
85-
if (this.shardRefCounter != null) {
86-
this.shardRefCounter.decRef();
87-
}
88-
this.shardRefCounter = shardRefCounter;
89-
this.shardRefCounter.mustIncRef();
90-
}
91-
9286
Row(CircuitBreaker breaker, List<SortOrder> sortOrders, int preAllocatedKeysSize, int preAllocatedValueSize) {
87+
this.breaker = breaker;
9388
boolean success = false;
9489
try {
90+
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "topn");
9591
keys = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedKeysSize);
9692
values = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedValueSize);
9793
bytesOrder = new BytesOrder(sortOrders, breaker, "topn");
@@ -111,7 +107,7 @@ public long ramBytesUsed() {
111107
@Override
112108
public void close() {
113109
clearRefCounters();
114-
Releasables.closeExpectNoException(keys, values, bytesOrder);
110+
Releasables.closeExpectNoException(() -> breaker.addWithoutBreaking(-SHALLOW_SIZE), keys, values, bytesOrder);
115111
}
116112

117113
public void clearRefCounters() {
@@ -120,6 +116,14 @@ public void clearRefCounters() {
120116
}
121117
shardRefCounter = null;
122118
}
119+
120+
void setShardRefCountersAndShard(RefCounted shardRefCounter) {
121+
if (this.shardRefCounter != null) {
122+
this.shardRefCounter.decRef();
123+
}
124+
this.shardRefCounter = shardRefCounter;
125+
this.shardRefCounter.mustIncRef();
126+
}
123127
}
124128

125129
static final class BytesOrder implements Releasable, Accountable {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,9 +1489,9 @@ public void testRowResizes() {
14891489
block.decRef();
14901490
op.addInput(new Page(blocks));
14911491

1492-
// 94 are from the collection process
1492+
// 105 are from the objects
14931493
// 1 is for the min-heap itself
1494-
assertThat(breaker.getMemoryRequestCount(), is(95L));
1494+
assertThat(breaker.getMemoryRequestCount(), is(106L));
14951495
}
14961496
}
14971497

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import static org.elasticsearch.test.MapMatcher.assertMap;
6060
import static org.elasticsearch.test.MapMatcher.matchesMap;
6161
import static org.elasticsearch.xpack.esql.core.type.DataType.isMillisOrNanos;
62+
import static org.elasticsearch.xpack.esql.planner.PhysicalSettings.LUCENE_TOPN_LIMIT;
6263
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
6364
import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile;
6465
import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse;
@@ -188,6 +189,18 @@ private void setLoggingLevel(String level) throws IOException {
188189
client().performRequest(request);
189190
}
190191

192+
private void setTruncationWindowMax(Integer size) throws IOException {
193+
Request request = new Request("PUT", "/_cluster/settings");
194+
request.setJsonEntity("""
195+
{
196+
"persistent": {
197+
"esql.query.result_truncation_max_size": $SIZE$
198+
}
199+
}
200+
""".replace("$SIZE$", size == null ? "null" : Integer.toString(size)));
201+
client().performRequest(request);
202+
}
203+
191204
public void testIncompatibleMappingsErrors() throws IOException {
192205
// create first index
193206
Request request = new Request("PUT", "/index1");
@@ -538,6 +551,86 @@ public void testInlineStatsProfile() throws IOException {
538551
);
539552
}
540553

554+
public void testSmallTopNProfile() throws IOException {
555+
testTopNProfile(false);
556+
}
557+
558+
public void testGiantTopNProfile() throws IOException {
559+
testTopNProfile(true);
560+
}
561+
562+
private void testTopNProfile(boolean giant) throws IOException {
563+
try {
564+
setTruncationWindowMax(1000000);
565+
indexTimestampData(1);
566+
567+
int size = between(1, LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue() - 1);
568+
if (giant) {
569+
size += LUCENE_TOPN_LIMIT.get(Settings.EMPTY).intValue();
570+
}
571+
RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | KEEP value | SORT value ASC | LIMIT " + size);
572+
573+
builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
574+
builder.profile(true);
575+
builder.pragmasOk();
576+
577+
Map<String, Object> result = runEsql(builder);
578+
ListMatcher values = matchesList();
579+
for (int i = 0; i < Math.min(1000, size); i++) {
580+
values = values.item(List.of(i));
581+
}
582+
assertResultMap(
583+
result,
584+
getResultMatcher(result).entry("profile", getProfileMatcher()),
585+
matchesList().item(matchesMap().entry("name", "value").entry("type", "long")),
586+
values
587+
);
588+
589+
@SuppressWarnings("unchecked")
590+
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
591+
for (Map<String, Object> p : profiles) {
592+
fixTypesOnProfile(p);
593+
assertThat(p, commonProfile());
594+
List<String> sig = new ArrayList<>();
595+
@SuppressWarnings("unchecked")
596+
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
597+
for (Map<String, Object> o : operators) {
598+
sig.add(checkOperatorProfile(o));
599+
}
600+
String description = p.get("description").toString();
601+
switch (description) {
602+
case "data" -> assertMap(
603+
sig,
604+
giant
605+
? matchesList().item("LuceneSourceOperator")
606+
.item("ValuesSourceReaderOperator")
607+
.item("TopNOperator")
608+
.item("ProjectOperator")
609+
.item("ExchangeSinkOperator")
610+
: matchesList().item("LuceneTopNSourceOperator")
611+
.item("ValuesSourceReaderOperator")
612+
.item("ProjectOperator")
613+
.item("ExchangeSinkOperator")
614+
);
615+
case "node_reduce" -> assertThat(
616+
sig,
617+
// If the coordinating node and data node are the same node then we get this
618+
either(matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"))
619+
// If the coordinating node and data node are *not* the same node we get this
620+
.or(matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ExchangeSinkOperator"))
621+
);
622+
case "final" -> assertMap(
623+
sig,
624+
matchesList().item("ExchangeSourceOperator").item("TopNOperator").item("ProjectOperator").item("OutputOperator")
625+
);
626+
default -> throw new IllegalArgumentException("can't match " + description);
627+
}
628+
}
629+
} finally {
630+
setTruncationWindowMax(null);
631+
}
632+
}
633+
541634
public void testForceSleepsProfile() throws IOException {
542635
assumeTrue("requires pragmas", Build.current().isSnapshot());
543636

@@ -940,7 +1033,9 @@ private String checkOperatorProfile(Map<String, Object> o) {
9401033
.entry("rows_received", greaterThan(0))
9411034
.entry("rows_emitted", greaterThan(0))
9421035
.entry("ram_used", instanceOf(String.class))
943-
.entry("ram_bytes_used", greaterThan(0));
1036+
.entry("ram_bytes_used", greaterThan(0))
1037+
.entry("receive_nanos", greaterThan(0))
1038+
.entry("emit_nanos", greaterThan(0));
9441039
case "LuceneTopNSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0))
9451040
.entry("rows_emitted", greaterThan(0))
9461041
.entry("current", greaterThan(0))
@@ -951,7 +1046,8 @@ private String checkOperatorProfile(Map<String, Object> o) {
9511046
.entry("slice_min", 0)
9521047
.entry("process_nanos", greaterThan(0))
9531048
.entry("processed_queries", List.of("*:*"))
954-
.entry("slice_index", 0);
1049+
.entry("slice_index", 0)
1050+
.entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
9551051
default -> throw new AssertionError("unexpected status: " + o);
9561052
};
9571053
MapMatcher expectedOp = matchesMap().entry("operator", startsWith(name));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalOptimizerContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,15 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
11+
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
1112
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
1213
import org.elasticsearch.xpack.esql.session.Configuration;
1314
import org.elasticsearch.xpack.esql.stats.SearchStats;
1415

15-
public record LocalPhysicalOptimizerContext(EsqlFlags flags, Configuration configuration, FoldContext foldCtx, SearchStats searchStats) {}
16+
public record LocalPhysicalOptimizerContext(
17+
PhysicalSettings physicalSettings,
18+
EsqlFlags flags,
19+
Configuration configuration,
20+
FoldContext foldCtx,
21+
SearchStats searchStats
22+
) {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
1919
import org.elasticsearch.xpack.esql.core.expression.NameId;
2020
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
21+
import org.elasticsearch.xpack.esql.expression.Foldables;
2122
import org.elasticsearch.xpack.esql.expression.Order;
2223
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.BinarySpatialFunction;
2324
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesUtils;
@@ -28,6 +29,7 @@
2829
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
2930
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3031
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
32+
import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
3133

3234
import java.util.ArrayList;
3335
import java.util.LinkedHashMap;
@@ -63,7 +65,12 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
6365

6466
@Override
6567
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
66-
Pushable pushable = evaluatePushable(ctx.foldCtx(), topNExec, LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()));
68+
Pushable pushable = evaluatePushable(
69+
ctx.physicalSettings(),
70+
ctx.foldCtx(),
71+
topNExec,
72+
LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())
73+
);
6774
return pushable.rewrite(topNExec);
6875
}
6976

@@ -124,15 +131,24 @@ public PhysicalPlan rewrite(TopNExec topNExec) {
124131
}
125132
}
126133

127-
private static Pushable evaluatePushable(FoldContext ctx, TopNExec topNExec, LucenePushdownPredicates lucenePushdownPredicates) {
134+
private static Pushable evaluatePushable(
135+
PhysicalSettings physicalSettings,
136+
FoldContext ctx,
137+
TopNExec topNExec,
138+
LucenePushdownPredicates lucenePushdownPredicates
139+
) {
128140
PhysicalPlan child = topNExec.child();
129141
if (child instanceof EsQueryExec queryExec
130142
&& queryExec.canPushSorts()
131-
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)) {
143+
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)
144+
&& canPushLimit(topNExec, physicalSettings)) {
132145
// With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field
133146
return new PushableQueryExec(queryExec);
134147
}
135-
if (child instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec && queryExec.canPushSorts()) {
148+
if (child instanceof EvalExec evalExec
149+
&& evalExec.child() instanceof EsQueryExec queryExec
150+
&& queryExec.canPushSorts()
151+
&& canPushLimit(topNExec, physicalSettings)) {
136152
// When we have an EVAL between the FROM and the SORT, we consider pushing down if the sort is on a field and/or
137153
// a distance function defined in the EVAL. We also move the EVAL to after the SORT.
138154
List<Order> orders = topNExec.order();
@@ -204,6 +220,10 @@ private static boolean canPushDownOrders(List<Order> orders, LucenePushdownPredi
204220
return orders.stream().allMatch(o -> isSortableAttribute.apply(o.child(), lucenePushdownPredicates));
205221
}
206222

223+
private static boolean canPushLimit(TopNExec topn, PhysicalSettings physicalSettings) {
224+
return Foldables.limitValue(topn.limit(), topn.sourceText()) <= physicalSettings.luceneTopNLimit();
225+
}
226+
207227
private static List<EsQueryExec.Sort> buildFieldSorts(List<Order> orders) {
208228
List<EsQueryExec.Sort> sorts = new ArrayList<>(orders.size());
209229
for (Order o : orders) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
281281
int rowEstimatedSize = esQueryExec.estimatedRowSize();
282282
int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT;
283283
boolean scoring = esQueryExec.hasScoring();
284-
if ((sorts != null && sorts.isEmpty() == false)) {
284+
if (sorts != null && sorts.isEmpty() == false) {
285285
List<SortBuilder<?>> sortBuilders = new ArrayList<>(sorts.size());
286286
long estimatedPerRowSortSize = 0;
287287
for (Sort sort : sorts) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
import org.elasticsearch.cluster.service.ClusterService;
1111
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.unit.ByteSizeValue;
1314
import org.elasticsearch.common.unit.MemorySizeValue;
1415
import org.elasticsearch.compute.lucene.DataPartitioning;
16+
import org.elasticsearch.index.IndexSettings;
1517
import org.elasticsearch.monitor.jvm.JvmInfo;
1618

1719
/**
@@ -35,23 +37,34 @@ public class PhysicalSettings {
3537
Setting.Property.Dynamic
3638
);
3739

40+
public static final Setting<Integer> LUCENE_TOPN_LIMIT = Setting.intSetting(
41+
"esql.lucene_topn_limit",
42+
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY),
43+
-1,
44+
Setting.Property.NodeScope,
45+
Setting.Property.Dynamic
46+
);
47+
3848
private volatile DataPartitioning defaultDataPartitioning;
3949
private volatile ByteSizeValue valuesLoadingJumboSize;
50+
private volatile int luceneTopNLimit;
4051

4152
/**
4253
* Ctor for prod that listens for updates from the {@link ClusterService}.
4354
*/
4455
public PhysicalSettings(ClusterService clusterService) {
4556
clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
4657
clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
58+
clusterService.getClusterSettings().initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v);
4759
}
4860

4961
/**
5062
* Ctor for testing.
5163
*/
52-
public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize) {
64+
public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, int luceneTopNLimit) {
5365
this.defaultDataPartitioning = defaultDataPartitioning;
5466
this.valuesLoadingJumboSize = valuesLoadingJumboSize;
67+
this.luceneTopNLimit = luceneTopNLimit;
5568
}
5669

5770
public DataPartitioning defaultDataPartitioning() {
@@ -61,4 +74,22 @@ public DataPartitioning defaultDataPartitioning() {
6174
public ByteSizeValue valuesLoadingJumboSize() {
6275
return valuesLoadingJumboSize;
6376
}
77+
78+
/**
79+
* Maximum {@code LIMIT} that we're willing to push to Lucene's topn.
80+
* <p>
81+
* Lucene's topn code was designed for <strong>search</strong>
82+
* which typically fetches 10 or 30 or 50 or 100 or 1000 documents.
83+
* That's as many you want on a page, and that's what it's designed for.
84+
* But if you go to, say, page 10, Lucene implements this as a search
85+
* for {@code page_size * page_number} docs and then materializes only
86+
* the last {@code page_size} documents. Traditionally, Elasticsearch
87+
* limits that {@code page_size * page_number} which it calls the
88+
* {@link IndexSettings#MAX_RESULT_WINDOW_SETTING "result window"}.
89+
* So! ESQL defaults to the same default - {@code 10,000}.
90+
* </p>
91+
*/
92+
public int luceneTopNLimit() {
93+
return luceneTopNLimit;
94+
}
6495
}

0 commit comments

Comments
 (0)