Skip to content

Commit bb82985

Browse files
committed
Avoid over collecting in Limit or Lucene Operator (elastic#123296)
Currently, we rely on signal propagation for early termination. For example, FROM index | LIMIT 10 can be executed by multiple Drivers: several Drivers to read document IDs and extract fields, and the final Driver to select at most 10 rows. In this scenario, each Lucene Driver can independently collect up to 10 rows until the final Driver has enough rows and signals them to stop collecting. In most cases, this model works fine, but when extracting fields from indices in the warm/cold tier, it can impact performance. This change introduces a Limiter used between LimitOperator and LuceneSourceOperator to avoid over-collecting. We will also need a follow-up to ensure that we do not over-collect between multiple stages of query execution.
1 parent ed2b641 commit bb82985

File tree

7 files changed

+258
-79
lines changed

7 files changed

+258
-79
lines changed

docs/changelog/123296.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123296
2+
summary: Avoid over collecting in Limit or Lucene Operator
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
import org.elasticsearch.compute.data.DocBlock;
1717
import org.elasticsearch.compute.data.DocVector;
1818
import org.elasticsearch.compute.data.DoubleVector;
19-
import org.elasticsearch.compute.data.IntBlock;
2019
import org.elasticsearch.compute.data.IntVector;
2120
import org.elasticsearch.compute.data.Page;
2221
import org.elasticsearch.compute.operator.DriverContext;
22+
import org.elasticsearch.compute.operator.Limiter;
2323
import org.elasticsearch.compute.operator.SourceOperator;
2424
import org.elasticsearch.core.Releasables;
2525

@@ -37,6 +37,7 @@ public class LuceneSourceOperator extends LuceneOperator {
3737

3838
private int currentPagePos = 0;
3939
private int remainingDocs;
40+
private final Limiter limiter;
4041

4142
private IntVector.Builder docsBuilder;
4243
private DoubleVector.Builder scoreBuilder;
@@ -46,6 +47,7 @@ public class LuceneSourceOperator extends LuceneOperator {
4647
public static class Factory extends LuceneOperator.Factory {
4748

4849
private final int maxPageSize;
50+
private final Limiter limiter;
4951

5052
public Factory(
5153
List<? extends ShardContext> contexts,
@@ -58,11 +60,13 @@ public Factory(
5860
) {
5961
super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, scoring ? COMPLETE : COMPLETE_NO_SCORES);
6062
this.maxPageSize = maxPageSize;
63+
// TODO: use a single limiter for multiple stage execution
64+
this.limiter = limit == NO_LIMIT ? Limiter.NO_LIMIT : new Limiter(limit);
6165
}
6266

6367
@Override
6468
public SourceOperator get(DriverContext driverContext) {
65-
return new LuceneSourceOperator(driverContext.blockFactory(), maxPageSize, sliceQueue, limit, scoreMode);
69+
return new LuceneSourceOperator(driverContext.blockFactory(), maxPageSize, sliceQueue, limit, limiter, scoreMode);
6670
}
6771

6872
public int maxPageSize() {
@@ -84,10 +88,18 @@ public String describe() {
8488
}
8589

8690
@SuppressWarnings("this-escape")
87-
public LuceneSourceOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue, int limit, ScoreMode scoreMode) {
91+
public LuceneSourceOperator(
92+
BlockFactory blockFactory,
93+
int maxPageSize,
94+
LuceneSliceQueue sliceQueue,
95+
int limit,
96+
Limiter limiter,
97+
ScoreMode scoreMode
98+
) {
8899
super(blockFactory, maxPageSize, sliceQueue);
89100
this.minPageSize = Math.max(1, maxPageSize / 2);
90101
this.remainingDocs = limit;
102+
this.limiter = limiter;
91103
int estimatedSize = Math.min(limit, maxPageSize);
92104
boolean success = false;
93105
try {
@@ -140,7 +152,7 @@ public void collect(int doc) throws IOException {
140152

141153
@Override
142154
public boolean isFinished() {
143-
return doneCollecting || remainingDocs <= 0;
155+
return doneCollecting || limiter.remaining() == 0;
144156
}
145157

146158
@Override
@@ -160,6 +172,7 @@ public Page getCheckedOutput() throws IOException {
160172
if (scorer == null) {
161173
return null;
162174
}
175+
final int remainingDocsStart = remainingDocs = limiter.remaining();
163176
try {
164177
scorer.scoreNextRange(
165178
leafCollector,
@@ -171,28 +184,32 @@ public Page getCheckedOutput() throws IOException {
171184
);
172185
} catch (CollectionTerminatedException ex) {
173186
// The leaf collector terminated the execution
187+
doneCollecting = true;
174188
scorer.markAsDone();
175189
}
190+
final int collectedDocs = remainingDocsStart - remainingDocs;
191+
final int discardedDocs = collectedDocs - limiter.tryAccumulateHits(collectedDocs);
176192
Page page = null;
177-
if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer.isDone()) {
178-
IntBlock shard = null;
179-
IntBlock leaf = null;
193+
if (currentPagePos >= minPageSize || scorer.isDone() || (remainingDocs = limiter.remaining()) == 0) {
194+
IntVector shard = null;
195+
IntVector leaf = null;
180196
IntVector docs = null;
181197
DoubleVector scores = null;
182198
DocBlock docBlock = null;
199+
currentPagePos -= discardedDocs;
183200
try {
184-
shard = blockFactory.newConstantIntBlockWith(scorer.shardContext().index(), currentPagePos);
185-
leaf = blockFactory.newConstantIntBlockWith(scorer.leafReaderContext().ord, currentPagePos);
186-
docs = docsBuilder.build();
201+
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
202+
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
203+
docs = buildDocsVector(currentPagePos);
187204
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
188-
docBlock = new DocVector(shard.asVector(), leaf.asVector(), docs, true).asBlock();
205+
docBlock = new DocVector(shard, leaf, docs, true).asBlock();
189206
shard = null;
190207
leaf = null;
191208
docs = null;
192209
if (scoreBuilder == null) {
193210
page = new Page(currentPagePos, docBlock);
194211
} else {
195-
scores = scoreBuilder.build();
212+
scores = buildScoresVector(currentPagePos);
196213
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
197214
page = new Page(currentPagePos, docBlock, scores.asBlock());
198215
}
@@ -209,6 +226,36 @@ public Page getCheckedOutput() throws IOException {
209226
}
210227
}
211228

229+
private IntVector buildDocsVector(int upToPositions) {
230+
final IntVector docs = docsBuilder.build();
231+
assert docs.getPositionCount() >= upToPositions : docs.getPositionCount() + " < " + upToPositions;
232+
if (docs.getPositionCount() == upToPositions) {
233+
return docs;
234+
}
235+
try (var slice = blockFactory.newIntVectorFixedBuilder(upToPositions)) {
236+
for (int i = 0; i < upToPositions; i++) {
237+
slice.appendInt(docs.getInt(i));
238+
}
239+
docs.close();
240+
return slice.build();
241+
}
242+
}
243+
244+
private DoubleVector buildScoresVector(int upToPositions) {
245+
final DoubleVector scores = scoreBuilder.build();
246+
assert scores.getPositionCount() >= upToPositions : scores.getPositionCount() + " < " + upToPositions;
247+
if (scores.getPositionCount() == upToPositions) {
248+
return scores;
249+
}
250+
try (var slice = blockFactory.newDoubleVectorBuilder(upToPositions)) {
251+
for (int i = 0; i < upToPositions; i++) {
252+
slice.appendDouble(scores.getDouble(i));
253+
}
254+
scores.close();
255+
return slice.build();
256+
}
257+
}
258+
212259
@Override
213260
public void close() {
214261
Releasables.close(docsBuilder, scoreBuilder);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,6 @@
2222
import java.util.Objects;
2323

2424
public class LimitOperator implements Operator {
25-
/**
26-
* Total number of position that are emitted by this operator.
27-
*/
28-
private final int limit;
29-
30-
/**
31-
* Remaining number of positions that will be emitted by this operator.
32-
*/
33-
private int limitRemaining;
3425

3526
/**
3627
* Count of pages that have been processed by this operator.
@@ -49,35 +40,49 @@ public class LimitOperator implements Operator {
4940

5041
private Page lastInput;
5142

43+
private final Limiter limiter;
5244
private boolean finished;
5345

54-
public LimitOperator(int limit) {
55-
this.limit = this.limitRemaining = limit;
46+
public LimitOperator(Limiter limiter) {
47+
this.limiter = limiter;
5648
}
5749

58-
public record Factory(int limit) implements OperatorFactory {
50+
public static final class Factory implements OperatorFactory {
51+
private final Limiter limiter;
52+
53+
public Factory(int limit) {
54+
this.limiter = new Limiter(limit);
55+
}
5956

6057
@Override
6158
public LimitOperator get(DriverContext driverContext) {
62-
return new LimitOperator(limit);
59+
return new LimitOperator(limiter);
6360
}
6461

6562
@Override
6663
public String describe() {
67-
return "LimitOperator[limit = " + limit + "]";
64+
return "LimitOperator[limit = " + limiter.limit() + "]";
6865
}
6966
}
7067

7168
@Override
7269
public boolean needsInput() {
73-
return finished == false && lastInput == null;
70+
return finished == false && lastInput == null && limiter.remaining() > 0;
7471
}
7572

7673
@Override
7774
public void addInput(Page page) {
7875
assert lastInput == null : "has pending input page";
79-
lastInput = page;
80-
rowsReceived += page.getPositionCount();
76+
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
77+
if (acceptedRows == 0) {
78+
page.releaseBlocks();
79+
assert isFinished();
80+
} else if (acceptedRows < page.getPositionCount()) {
81+
lastInput = truncatePage(page, acceptedRows);
82+
} else {
83+
lastInput = page;
84+
}
85+
rowsReceived += acceptedRows;
8186
}
8287

8388
@Override
@@ -87,55 +92,46 @@ public void finish() {
8792

8893
@Override
8994
public boolean isFinished() {
90-
return finished && lastInput == null;
95+
return lastInput == null && (finished || limiter.remaining() == 0);
9196
}
9297

9398
@Override
9499
public Page getOutput() {
95100
if (lastInput == null) {
96101
return null;
97102
}
98-
99-
Page result;
100-
if (lastInput.getPositionCount() <= limitRemaining) {
101-
result = lastInput;
102-
limitRemaining -= lastInput.getPositionCount();
103-
} else {
104-
int[] filter = new int[limitRemaining];
105-
for (int i = 0; i < limitRemaining; i++) {
106-
filter[i] = i;
107-
}
108-
Block[] blocks = new Block[lastInput.getBlockCount()];
109-
boolean success = false;
110-
try {
111-
for (int b = 0; b < blocks.length; b++) {
112-
blocks[b] = lastInput.getBlock(b).filter(filter);
113-
}
114-
success = true;
115-
} finally {
116-
if (success == false) {
117-
Releasables.closeExpectNoException(lastInput::releaseBlocks, Releasables.wrap(blocks));
118-
} else {
119-
lastInput.releaseBlocks();
120-
}
121-
lastInput = null;
122-
}
123-
result = new Page(blocks);
124-
limitRemaining = 0;
125-
}
126-
if (limitRemaining == 0) {
127-
finished = true;
128-
}
103+
final Page result = lastInput;
129104
lastInput = null;
130105
pagesProcessed++;
131106
rowsEmitted += result.getPositionCount();
107+
return result;
108+
}
132109

110+
private static Page truncatePage(Page page, int upTo) {
111+
int[] filter = new int[upTo];
112+
for (int i = 0; i < upTo; i++) {
113+
filter[i] = i;
114+
}
115+
final Block[] blocks = new Block[page.getBlockCount()];
116+
Page result = null;
117+
try {
118+
for (int b = 0; b < blocks.length; b++) {
119+
blocks[b] = page.getBlock(b).filter(filter);
120+
}
121+
result = new Page(blocks);
122+
} finally {
123+
if (result == null) {
124+
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
125+
} else {
126+
page.releaseBlocks();
127+
}
128+
}
133129
return result;
134130
}
135131

136132
@Override
137133
public Status status() {
138-
return new Status(limit, limitRemaining, pagesProcessed, rowsReceived, rowsEmitted);
134+
return new Status(limiter.limit(), limiter.remaining(), pagesProcessed, rowsReceived, rowsEmitted);
139135
}
140136

141137
@Override
@@ -147,6 +143,8 @@ public void close() {
147143

148144
@Override
149145
public String toString() {
146+
final int limitRemaining = limiter.remaining();
147+
final int limit = limiter.limit();
150148
return "LimitOperator[limit = " + limitRemaining + "/" + limit + "]";
151149
}
152150

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
12+
/**
13+
* A shared limiter used by multiple drivers to collect hits in parallel without exceeding the output limit.
14+
* For example, if the query `FROM test-1,test-2 | LIMIT 100` is run with two drivers, and one driver (e.g., querying `test-1`)
15+
* has collected 60 hits, then the other driver querying `test-2` should collect at most 40 hits.
16+
*/
17+
public class Limiter {
18+
private final int limit;
19+
private final AtomicInteger collected = new AtomicInteger();
20+
21+
public static Limiter NO_LIMIT = new Limiter(Integer.MAX_VALUE) {
22+
@Override
23+
public int tryAccumulateHits(int numHits) {
24+
return numHits;
25+
}
26+
27+
@Override
28+
public int remaining() {
29+
return Integer.MAX_VALUE;
30+
}
31+
};
32+
33+
public Limiter(int limit) {
34+
this.limit = limit;
35+
}
36+
37+
/**
38+
* Returns the remaining number of hits that can be collected.
39+
*/
40+
public int remaining() {
41+
final int remaining = limit - collected.get();
42+
assert remaining >= 0 : remaining;
43+
return remaining;
44+
}
45+
46+
/**
47+
* Returns the limit of this limiter.
48+
*/
49+
public int limit() {
50+
return limit;
51+
}
52+
53+
/**
54+
* Tries to accumulate hits and returns the number of hits that has been accepted.
55+
*
56+
* @param numHits the number of hits to try to accumulate
57+
* @return the accepted number of hits. If the returned number is less than the numHits,
58+
* it means the limit has been reached and the difference can be discarded.
59+
*/
60+
public int tryAccumulateHits(int numHits) {
61+
while (true) {
62+
int curVal = collected.get();
63+
if (curVal >= limit) {
64+
return 0;
65+
}
66+
final int toAccept = Math.min(limit - curVal, numHits);
67+
if (collected.compareAndSet(curVal, curVal + toAccept)) {
68+
return toAccept;
69+
}
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)