Skip to content

Commit bbef844

Browse files
committed
Share limiter between limit or lucene source operators
1 parent a792334 commit bbef844

File tree

6 files changed

+253
-79
lines changed

6 files changed

+253
-79
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
import org.elasticsearch.compute.data.DocBlock;
1717
import org.elasticsearch.compute.data.DocVector;
1818
import org.elasticsearch.compute.data.DoubleVector;
19-
import org.elasticsearch.compute.data.IntBlock;
2019
import org.elasticsearch.compute.data.IntVector;
2120
import org.elasticsearch.compute.data.Page;
2221
import org.elasticsearch.compute.operator.DriverContext;
22+
import org.elasticsearch.compute.operator.Limiter;
2323
import org.elasticsearch.compute.operator.SourceOperator;
2424
import org.elasticsearch.core.Releasables;
2525

@@ -37,6 +37,7 @@ public class LuceneSourceOperator extends LuceneOperator {
3737

3838
private int currentPagePos = 0;
3939
private int remainingDocs;
40+
private final Limiter limiter;
4041

4142
private IntVector.Builder docsBuilder;
4243
private DoubleVector.Builder scoreBuilder;
@@ -46,6 +47,7 @@ public class LuceneSourceOperator extends LuceneOperator {
4647
public static class Factory extends LuceneOperator.Factory {
4748

4849
private final int maxPageSize;
50+
private final Limiter limiter;
4951

5052
public Factory(
5153
List<? extends ShardContext> contexts,
@@ -58,11 +60,13 @@ public Factory(
5860
) {
5961
super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, scoring ? COMPLETE : COMPLETE_NO_SCORES);
6062
this.maxPageSize = maxPageSize;
63+
// TODO: use a single limiter for multiple stage execution
64+
this.limiter = limit == NO_LIMIT ? Limiter.NO_LIMIT : new Limiter(limit);
6165
}
6266

6367
@Override
6468
public SourceOperator get(DriverContext driverContext) {
65-
return new LuceneSourceOperator(driverContext.blockFactory(), maxPageSize, sliceQueue, limit, scoreMode);
69+
return new LuceneSourceOperator(driverContext.blockFactory(), maxPageSize, sliceQueue, limit, limiter, scoreMode);
6670
}
6771

6872
public int maxPageSize() {
@@ -84,10 +88,18 @@ public String describe() {
8488
}
8589

8690
@SuppressWarnings("this-escape")
87-
public LuceneSourceOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue, int limit, ScoreMode scoreMode) {
91+
public LuceneSourceOperator(
92+
BlockFactory blockFactory,
93+
int maxPageSize,
94+
LuceneSliceQueue sliceQueue,
95+
int limit,
96+
Limiter limiter,
97+
ScoreMode scoreMode
98+
) {
8899
super(blockFactory, maxPageSize, sliceQueue);
89100
this.minPageSize = Math.max(1, maxPageSize / 2);
90101
this.remainingDocs = limit;
102+
this.limiter = limiter;
91103
int estimatedSize = Math.min(limit, maxPageSize);
92104
boolean success = false;
93105
try {
@@ -140,7 +152,7 @@ public void collect(int doc) throws IOException {
140152

141153
@Override
142154
public boolean isFinished() {
143-
return doneCollecting || remainingDocs <= 0;
155+
return doneCollecting || limiter.remaining() == 0;
144156
}
145157

146158
@Override
@@ -160,6 +172,7 @@ public Page getCheckedOutput() throws IOException {
160172
if (scorer == null) {
161173
return null;
162174
}
175+
final int remainingDocsStart = remainingDocs = limiter.remaining();
163176
try {
164177
scorer.scoreNextRange(
165178
leafCollector,
@@ -171,28 +184,32 @@ public Page getCheckedOutput() throws IOException {
171184
);
172185
} catch (CollectionTerminatedException ex) {
173186
// The leaf collector terminated the execution
187+
doneCollecting = true;
174188
scorer.markAsDone();
175189
}
190+
final int collectedDocs = remainingDocsStart - remainingDocs;
191+
final int discardedDocs = collectedDocs - limiter.tryAccumulateHits(collectedDocs);
176192
Page page = null;
177-
if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer.isDone()) {
178-
IntBlock shard = null;
179-
IntBlock leaf = null;
193+
if (currentPagePos >= minPageSize || scorer.isDone() || (remainingDocs = limiter.remaining()) == 0) {
194+
IntVector shard = null;
195+
IntVector leaf = null;
180196
IntVector docs = null;
181197
DoubleVector scores = null;
182198
DocBlock docBlock = null;
199+
currentPagePos -= discardedDocs;
183200
try {
184-
shard = blockFactory.newConstantIntBlockWith(scorer.shardContext().index(), currentPagePos);
185-
leaf = blockFactory.newConstantIntBlockWith(scorer.leafReaderContext().ord, currentPagePos);
186-
docs = docsBuilder.build();
201+
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
202+
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
203+
docs = buildDocsVector(currentPagePos);
187204
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
188-
docBlock = new DocVector(shard.asVector(), leaf.asVector(), docs, true).asBlock();
205+
docBlock = new DocVector(shard, leaf, docs, true).asBlock();
189206
shard = null;
190207
leaf = null;
191208
docs = null;
192209
if (scoreBuilder == null) {
193210
page = new Page(currentPagePos, docBlock);
194211
} else {
195-
scores = scoreBuilder.build();
212+
scores = buildScoresVector(currentPagePos);
196213
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
197214
page = new Page(currentPagePos, docBlock, scores.asBlock());
198215
}
@@ -209,6 +226,36 @@ public Page getCheckedOutput() throws IOException {
209226
}
210227
}
211228

229+
private IntVector buildDocsVector(int upToPositions) {
230+
final IntVector docs = docsBuilder.build();
231+
assert docs.getPositionCount() >= upToPositions : docs.getPositionCount() + " < " + upToPositions;
232+
if (docs.getPositionCount() == upToPositions) {
233+
return docs;
234+
}
235+
try (var slice = blockFactory.newIntVectorFixedBuilder(upToPositions)) {
236+
for (int i = 0; i < upToPositions; i++) {
237+
slice.appendInt(docs.getInt(i));
238+
}
239+
docs.close();
240+
return slice.build();
241+
}
242+
}
243+
244+
private DoubleVector buildScoresVector(int upToPositions) {
245+
final DoubleVector scores = scoreBuilder.build();
246+
assert scores.getPositionCount() >= upToPositions : scores.getPositionCount() + " < " + upToPositions;
247+
if (scores.getPositionCount() == upToPositions) {
248+
return scores;
249+
}
250+
try (var slice = blockFactory.newDoubleVectorBuilder(upToPositions)) {
251+
for (int i = 0; i < upToPositions; i++) {
252+
slice.appendDouble(scores.getDouble(i));
253+
}
254+
scores.close();
255+
return slice.build();
256+
}
257+
}
258+
212259
@Override
213260
public void close() {
214261
Releasables.close(docsBuilder, scoreBuilder);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,6 @@
2222
import java.util.Objects;
2323

2424
public class LimitOperator implements Operator {
25-
/**
26-
* Total number of position that are emitted by this operator.
27-
*/
28-
private final int limit;
29-
30-
/**
31-
* Remaining number of positions that will be emitted by this operator.
32-
*/
33-
private int limitRemaining;
3425

3526
/**
3627
* Count of pages that have been processed by this operator.
@@ -49,35 +40,49 @@ public class LimitOperator implements Operator {
4940

5041
private Page lastInput;
5142

43+
private final Limiter limiter;
5244
private boolean finished;
5345

54-
public LimitOperator(int limit) {
55-
this.limit = this.limitRemaining = limit;
46+
public LimitOperator(Limiter limiter) {
47+
this.limiter = limiter;
5648
}
5749

58-
public record Factory(int limit) implements OperatorFactory {
50+
public static final class Factory implements OperatorFactory {
51+
private final Limiter limiter;
52+
53+
public Factory(int limit) {
54+
this.limiter = new Limiter(limit);
55+
}
5956

6057
@Override
6158
public LimitOperator get(DriverContext driverContext) {
62-
return new LimitOperator(limit);
59+
return new LimitOperator(limiter);
6360
}
6461

6562
@Override
6663
public String describe() {
67-
return "LimitOperator[limit = " + limit + "]";
64+
return "LimitOperator[limit = " + limiter.limit() + "]";
6865
}
6966
}
7067

7168
@Override
7269
public boolean needsInput() {
73-
return finished == false && lastInput == null;
70+
return finished == false && lastInput == null && limiter.remaining() > 0;
7471
}
7572

7673
@Override
7774
public void addInput(Page page) {
7875
assert lastInput == null : "has pending input page";
79-
lastInput = page;
80-
rowsReceived += page.getPositionCount();
76+
final int acceptedRows = limiter.tryAccumulateHits(page.getPositionCount());
77+
if (acceptedRows == 0) {
78+
page.releaseBlocks();
79+
assert isFinished();
80+
} else if (acceptedRows < page.getPositionCount()) {
81+
lastInput = truncatePage(page, acceptedRows);
82+
} else {
83+
lastInput = page;
84+
}
85+
rowsReceived += acceptedRows;
8186
}
8287

8388
@Override
@@ -87,55 +92,46 @@ public void finish() {
8792

8893
@Override
8994
public boolean isFinished() {
90-
return finished && lastInput == null;
95+
return lastInput == null && (finished || limiter.remaining() == 0);
9196
}
9297

9398
@Override
9499
public Page getOutput() {
95100
if (lastInput == null) {
96101
return null;
97102
}
98-
99-
Page result;
100-
if (lastInput.getPositionCount() <= limitRemaining) {
101-
result = lastInput;
102-
limitRemaining -= lastInput.getPositionCount();
103-
} else {
104-
int[] filter = new int[limitRemaining];
105-
for (int i = 0; i < limitRemaining; i++) {
106-
filter[i] = i;
107-
}
108-
Block[] blocks = new Block[lastInput.getBlockCount()];
109-
boolean success = false;
110-
try {
111-
for (int b = 0; b < blocks.length; b++) {
112-
blocks[b] = lastInput.getBlock(b).filter(filter);
113-
}
114-
success = true;
115-
} finally {
116-
if (success == false) {
117-
Releasables.closeExpectNoException(lastInput::releaseBlocks, Releasables.wrap(blocks));
118-
} else {
119-
lastInput.releaseBlocks();
120-
}
121-
lastInput = null;
122-
}
123-
result = new Page(blocks);
124-
limitRemaining = 0;
125-
}
126-
if (limitRemaining == 0) {
127-
finished = true;
128-
}
103+
final Page result = lastInput;
129104
lastInput = null;
130105
pagesProcessed++;
131106
rowsEmitted += result.getPositionCount();
107+
return result;
108+
}
132109

110+
private static Page truncatePage(Page page, int upTo) {
111+
int[] filter = new int[upTo];
112+
for (int i = 0; i < upTo; i++) {
113+
filter[i] = i;
114+
}
115+
final Block[] blocks = new Block[page.getBlockCount()];
116+
Page result = null;
117+
try {
118+
for (int b = 0; b < blocks.length; b++) {
119+
blocks[b] = page.getBlock(b).filter(filter);
120+
}
121+
result = new Page(blocks);
122+
} finally {
123+
if (result == null) {
124+
Releasables.closeExpectNoException(page::releaseBlocks, Releasables.wrap(blocks));
125+
} else {
126+
page.releaseBlocks();
127+
}
128+
}
133129
return result;
134130
}
135131

136132
@Override
137133
public Status status() {
138-
return new Status(limit, limitRemaining, pagesProcessed, rowsReceived, rowsEmitted);
134+
return new Status(limiter.limit(), limiter.remaining(), pagesProcessed, rowsReceived, rowsEmitted);
139135
}
140136

141137
@Override
@@ -147,6 +143,8 @@ public void close() {
147143

148144
@Override
149145
public String toString() {
146+
final int limitRemaining = limiter.remaining();
147+
final int limit = limiter.limit();
150148
return "LimitOperator[limit = " + limitRemaining + "/" + limit + "]";
151149
}
152150

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
12+
/**
13+
* A shared limiter used by multiple drivers to collect hits in parallel without exceeding the output limit.
14+
* For example, if the query `FROM test-1,test-2 | LIMIT 100` is run with two drivers, and one driver (e.g., querying `test-1`)
15+
* has collected 60 hits, then the other driver querying `test-2` should collect at most 40 hits.
16+
*/
17+
public class Limiter {
18+
private final int limit;
19+
private final AtomicInteger collected = new AtomicInteger();
20+
21+
public static Limiter NO_LIMIT = new Limiter(Integer.MAX_VALUE) {
22+
@Override
23+
public int tryAccumulateHits(int numHits) {
24+
return numHits;
25+
}
26+
27+
@Override
28+
public int remaining() {
29+
return Integer.MAX_VALUE;
30+
}
31+
};
32+
33+
public Limiter(int limit) {
34+
this.limit = limit;
35+
}
36+
37+
/**
38+
* Returns the remaining number of hits that can be collected.
39+
*/
40+
public int remaining() {
41+
final int remaining = limit - collected.get();
42+
assert remaining >= 0 : remaining;
43+
return remaining;
44+
}
45+
46+
/**
47+
* Returns the limit of this limiter.
48+
*/
49+
public int limit() {
50+
return limit;
51+
}
52+
53+
/**
54+
* Tries to accumulate hits and returns the number of hits that has been accepted.
55+
*
56+
* @param numHits the number of hits to try to accumulate
57+
* @return the accepted number of hits. If the returned number is less than the numHits,
58+
* it means the limit has been reached and the difference can be discarded.
59+
*/
60+
public int tryAccumulateHits(int numHits) {
61+
while (true) {
62+
int curVal = collected.get();
63+
if (curVal >= limit) {
64+
return 0;
65+
}
66+
final int toAccept = Math.min(limit - curVal, numHits);
67+
if (collected.compareAndSet(curVal, curVal + toAccept)) {
68+
return toAccept;
69+
}
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)