Skip to content

Commit 8259f89

Browse files
authored
Merge branch 'main' into excetpion_funcscorequery
2 parents 3778bc6 + 35ee644 commit 8259f89

File tree

18 files changed

+501
-145
lines changed

18 files changed

+501
-145
lines changed

docs/changelog/133245.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133245
2+
summary: Add query heads priority to `SliceQueue`
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluator.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.compute.data.DocVector;
2525
import org.elasticsearch.compute.data.IntVector;
2626
import org.elasticsearch.compute.data.Page;
27-
import org.elasticsearch.compute.data.Vector;
2827
import org.elasticsearch.core.Releasable;
2928
import org.elasticsearch.core.Releasables;
3029

@@ -44,12 +43,12 @@
4443
* It's much faster to push queries to the {@link LuceneSourceOperator} or the like, but sometimes this isn't possible. So
4544
* this class is here to save the day.
4645
*/
47-
public abstract class LuceneQueryEvaluator<T extends Vector.Builder> implements Releasable {
46+
public abstract class LuceneQueryEvaluator<T extends Block.Builder> implements Releasable {
4847

4948
public record ShardConfig(Query query, IndexSearcher searcher) {}
5049

5150
private final BlockFactory blockFactory;
52-
private final ShardConfig[] shards;
51+
protected final ShardConfig[] shards;
5352

5453
private final List<ShardState> perShardState;
5554

@@ -67,9 +66,9 @@ public Block executeQuery(Page page) {
6766
DocVector docs = (DocVector) block.asVector();
6867
try {
6968
if (docs.singleSegmentNonDecreasing()) {
70-
return evalSingleSegmentNonDecreasing(docs).asBlock();
69+
return evalSingleSegmentNonDecreasing(docs);
7170
} else {
72-
return evalSlow(docs).asBlock();
71+
return evalSlow(docs);
7372
}
7473
} catch (IOException e) {
7574
throw new UncheckedIOException(e);
@@ -106,15 +105,15 @@ public Block executeQuery(Page page) {
106105
* common.
107106
* </p>
108107
*/
109-
private Vector evalSingleSegmentNonDecreasing(DocVector docs) throws IOException {
108+
private Block evalSingleSegmentNonDecreasing(DocVector docs) throws IOException {
110109
ShardState shardState = shardState(docs.shards().getInt(0));
111110
SegmentState segmentState = shardState.segmentState(docs.segments().getInt(0));
112111
int min = docs.docs().getInt(0);
113112
int max = docs.docs().getInt(docs.getPositionCount() - 1);
114113
int length = max - min + 1;
115-
try (T scoreBuilder = createVectorBuilder(blockFactory, docs.getPositionCount())) {
114+
try (T scoreBuilder = createBlockBuilder(blockFactory, docs.getPositionCount())) {
116115
if (length == docs.getPositionCount() && length > 1) {
117-
return segmentState.scoreDense(scoreBuilder, min, max);
116+
return segmentState.scoreDense(scoreBuilder, min, max, docs.getPositionCount());
118117
}
119118
return segmentState.scoreSparse(scoreBuilder, docs.docs());
120119
}
@@ -134,13 +133,13 @@ private Vector evalSingleSegmentNonDecreasing(DocVector docs) throws IOException
134133
* the order that the {@link DocVector} came in.
135134
* </p>
136135
*/
137-
private Vector evalSlow(DocVector docs) throws IOException {
136+
private Block evalSlow(DocVector docs) throws IOException {
138137
int[] map = docs.shardSegmentDocMapForwards();
139138
// Clear any state flags from the previous run
140139
int prevShard = -1;
141140
int prevSegment = -1;
142141
SegmentState segmentState = null;
143-
try (T scoreBuilder = createVectorBuilder(blockFactory, docs.getPositionCount())) {
142+
try (T scoreBuilder = createBlockBuilder(blockFactory, docs.getPositionCount())) {
144143
for (int i = 0; i < docs.getPositionCount(); i++) {
145144
int shard = docs.shards().getInt(docs.shards().getInt(map[i]));
146145
int segment = docs.segments().getInt(map[i]);
@@ -156,7 +155,7 @@ private Vector evalSlow(DocVector docs) throws IOException {
156155
segmentState.scoreSingleDocWithScorer(scoreBuilder, docs.docs().getInt(map[i]));
157156
}
158157
}
159-
try (Vector outOfOrder = scoreBuilder.build()) {
158+
try (Block outOfOrder = scoreBuilder.build()) {
160159
return outOfOrder.filter(docs.shardSegmentDocMapBackwards());
161160
}
162161
}
@@ -247,9 +246,9 @@ private SegmentState(Weight weight, LeafReaderContext ctx) {
247246
* Score a range using the {@link BulkScorer}. This should be faster
248247
* than using {@link #scoreSparse} for dense doc ids.
249248
*/
250-
Vector scoreDense(T scoreBuilder, int min, int max) throws IOException {
249+
Block scoreDense(T scoreBuilder, int min, int max, int positionCount) throws IOException {
251250
if (noMatch) {
252-
return createNoMatchVector(blockFactory, max - min + 1);
251+
return createNoMatchBlock(blockFactory, max - min + 1);
253252
}
254253
if (bulkScorer == null || // The bulkScorer wasn't initialized
255254
Thread.currentThread() != bulkScorerThread // The bulkScorer was initialized on a different thread
@@ -258,19 +257,22 @@ Vector scoreDense(T scoreBuilder, int min, int max) throws IOException {
258257
bulkScorer = weight.bulkScorer(ctx);
259258
if (bulkScorer == null) {
260259
noMatch = true;
261-
return createNoMatchVector(blockFactory, max - min + 1);
260+
return createNoMatchBlock(blockFactory, positionCount);
262261
}
263262
}
264263
try (
265264
DenseCollector<T> collector = new DenseCollector<>(
266265
min,
267266
max,
268267
scoreBuilder,
268+
ctx,
269269
LuceneQueryEvaluator.this::appendNoMatch,
270-
LuceneQueryEvaluator.this::appendMatch
270+
LuceneQueryEvaluator.this::appendMatch,
271+
weight.getQuery()
271272
)
272273
) {
273274
bulkScorer.score(collector, ctx.reader().getLiveDocs(), min, max + 1);
275+
collector.finish();
274276
return collector.build();
275277
}
276278
}
@@ -279,10 +281,10 @@ Vector scoreDense(T scoreBuilder, int min, int max) throws IOException {
279281
* Score a vector of doc ids using {@link Scorer}. If you have a dense range of
280282
* doc ids it'd be faster to use {@link #scoreDense}.
281283
*/
282-
Vector scoreSparse(T scoreBuilder, IntVector docs) throws IOException {
284+
Block scoreSparse(T scoreBuilder, IntVector docs) throws IOException {
283285
initScorer(docs.getInt(0));
284286
if (noMatch) {
285-
return createNoMatchVector(blockFactory, docs.getPositionCount());
287+
return createNoMatchBlock(blockFactory, docs.getPositionCount());
286288
}
287289
for (int i = 0; i < docs.getPositionCount(); i++) {
288290
scoreSingleDocWithScorer(scoreBuilder, docs.getInt(i));
@@ -326,11 +328,13 @@ private void scoreSingleDocWithScorer(T builder, int doc) throws IOException {
326328
* doc ids are sent to {@link LeafCollector#collect(int)} in ascending order
327329
* which isn't documented, but @jpountz swears is true.
328330
*/
329-
static class DenseCollector<U extends Vector.Builder> implements LeafCollector, Releasable {
331+
static class DenseCollector<U extends Block.Builder> implements LeafCollector, Releasable {
330332
private final U scoreBuilder;
331333
private final int max;
334+
private final LeafReaderContext leafReaderContext;
332335
private final Consumer<U> appendNoMatch;
333336
private final CheckedBiConsumer<U, Scorable, IOException> appendMatch;
337+
private final Query query;
334338

335339
private Scorable scorer;
336340
int next;
@@ -339,14 +343,18 @@ static class DenseCollector<U extends Vector.Builder> implements LeafCollector,
339343
int min,
340344
int max,
341345
U scoreBuilder,
346+
LeafReaderContext leafReaderContext,
342347
Consumer<U> appendNoMatch,
343-
CheckedBiConsumer<U, Scorable, IOException> appendMatch
348+
CheckedBiConsumer<U, Scorable, IOException> appendMatch,
349+
Query query
344350
) {
345351
this.scoreBuilder = scoreBuilder;
346352
this.max = max;
347353
next = min;
354+
this.leafReaderContext = leafReaderContext;
348355
this.appendNoMatch = appendNoMatch;
349356
this.appendMatch = appendMatch;
357+
this.query = query;
350358
}
351359

352360
@Override
@@ -362,7 +370,7 @@ public void collect(int doc) throws IOException {
362370
appendMatch.accept(scoreBuilder, scorer);
363371
}
364372

365-
public Vector build() {
373+
public Block build() {
366374
return scoreBuilder.build();
367375
}
368376

@@ -387,12 +395,12 @@ public void close() {
387395
/**
388396
* Creates a vector where all positions correspond to elements that don't match the query
389397
*/
390-
protected abstract Vector createNoMatchVector(BlockFactory blockFactory, int size);
398+
protected abstract Block createNoMatchBlock(BlockFactory blockFactory, int size);
391399

392400
/**
393401
* Creates the corresponding vector builder to store the results of evaluating the query
394402
*/
395-
protected abstract T createVectorBuilder(BlockFactory blockFactory, int size);
403+
protected abstract T createBlockBuilder(BlockFactory blockFactory, int size);
396404

397405
/**
398406
* Appends a matching result to a builder created by @link createVectorBuilder}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneQueryExpressionEvaluator.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import org.apache.lucene.search.ScoreMode;
1313
import org.elasticsearch.compute.data.Block;
1414
import org.elasticsearch.compute.data.BlockFactory;
15+
import org.elasticsearch.compute.data.BooleanBlock;
1516
import org.elasticsearch.compute.data.BooleanVector;
1617
import org.elasticsearch.compute.data.Page;
17-
import org.elasticsearch.compute.data.Vector;
1818
import org.elasticsearch.compute.operator.DriverContext;
1919
import org.elasticsearch.compute.operator.EvalOperator;
2020

@@ -26,9 +26,7 @@
2626
* a {@link BooleanVector}.
2727
* @see LuceneQueryScoreEvaluator
2828
*/
29-
public class LuceneQueryExpressionEvaluator extends LuceneQueryEvaluator<BooleanVector.Builder>
30-
implements
31-
EvalOperator.ExpressionEvaluator {
29+
public class LuceneQueryExpressionEvaluator extends LuceneQueryEvaluator<BooleanBlock.Builder> implements EvalOperator.ExpressionEvaluator {
3230

3331
LuceneQueryExpressionEvaluator(BlockFactory blockFactory, ShardConfig[] shards) {
3432
super(blockFactory, shards);
@@ -45,22 +43,22 @@ protected ScoreMode scoreMode() {
4543
}
4644

4745
@Override
48-
protected Vector createNoMatchVector(BlockFactory blockFactory, int size) {
49-
return blockFactory.newConstantBooleanVector(false, size);
46+
protected Block createNoMatchBlock(BlockFactory blockFactory, int size) {
47+
return blockFactory.newConstantBooleanBlockWith(false, size);
5048
}
5149

5250
@Override
53-
protected BooleanVector.Builder createVectorBuilder(BlockFactory blockFactory, int size) {
54-
return blockFactory.newBooleanVectorFixedBuilder(size);
51+
protected BooleanBlock.Builder createBlockBuilder(BlockFactory blockFactory, int size) {
52+
return blockFactory.newBooleanBlockBuilder(size);
5553
}
5654

5755
@Override
58-
protected void appendNoMatch(BooleanVector.Builder builder) {
56+
protected void appendNoMatch(BooleanBlock.Builder builder) {
5957
builder.appendBoolean(false);
6058
}
6159

6260
@Override
63-
protected void appendMatch(BooleanVector.Builder builder, Scorable scorer) throws IOException {
61+
protected void appendMatch(BooleanBlock.Builder builder, Scorable scorer) throws IOException {
6462
builder.appendBoolean(true);
6563
}
6664

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneQueryScoreEvaluator.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.compute.data.DoubleBlock;
1515
import org.elasticsearch.compute.data.DoubleVector;
1616
import org.elasticsearch.compute.data.Page;
17-
import org.elasticsearch.compute.data.Vector;
1817
import org.elasticsearch.compute.operator.DriverContext;
1918
import org.elasticsearch.compute.operator.ScoreOperator;
2019

@@ -27,7 +26,7 @@
2726
* Elements that don't match will have a score of {@link #NO_MATCH_SCORE}.
2827
* @see LuceneQueryScoreEvaluator
2928
*/
30-
public class LuceneQueryScoreEvaluator extends LuceneQueryEvaluator<DoubleVector.Builder> implements ScoreOperator.ExpressionScorer {
29+
public class LuceneQueryScoreEvaluator extends LuceneQueryEvaluator<DoubleBlock.Builder> implements ScoreOperator.ExpressionScorer {
3130

3231
public static final double NO_MATCH_SCORE = 0.0;
3332

@@ -46,22 +45,22 @@ protected ScoreMode scoreMode() {
4645
}
4746

4847
@Override
49-
protected Vector createNoMatchVector(BlockFactory blockFactory, int size) {
50-
return blockFactory.newConstantDoubleVector(NO_MATCH_SCORE, size);
48+
protected DoubleBlock createNoMatchBlock(BlockFactory blockFactory, int size) {
49+
return blockFactory.newConstantDoubleBlockWith(NO_MATCH_SCORE, size);
5150
}
5251

5352
@Override
54-
protected DoubleVector.Builder createVectorBuilder(BlockFactory blockFactory, int size) {
55-
return blockFactory.newDoubleVectorFixedBuilder(size);
53+
protected DoubleBlock.Builder createBlockBuilder(BlockFactory blockFactory, int size) {
54+
return blockFactory.newDoubleBlockBuilder(size);
5655
}
5756

5857
@Override
59-
protected void appendNoMatch(DoubleVector.Builder builder) {
58+
protected void appendNoMatch(DoubleBlock.Builder builder) {
6059
builder.appendDouble(NO_MATCH_SCORE);
6160
}
6261

6362
@Override
64-
protected void appendMatch(DoubleVector.Builder builder, Scorable scorer) throws IOException {
63+
protected void appendMatch(DoubleBlock.Builder builder, Scorable scorer) throws IOException {
6564
builder.appendDouble(scorer.score());
6665
}
6766

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
public record LuceneSlice(
1818
int slicePosition,
19+
boolean queryHead,
1920
ShardContext shardContext,
2021
List<PartialLeafReaderContext> leaves,
2122
Weight weight,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,21 @@ public record QueryAndTags(Query query, List<Object> tags) {}
8383
private final Map<String, PartitioningStrategy> partitioningStrategies;
8484

8585
private final AtomicReferenceArray<LuceneSlice> slices;
86+
/**
87+
* Queue of slice IDs that are the primary entry point for a new query.
88+
* A driver should prioritize polling from this queue after failing to get a sequential
89+
* slice (the query/segment affinity). This ensures that threads start work on fresh,
90+
* independent query before stealing segments from other queries.
91+
*/
92+
private final Queue<Integer> queryHeads;
93+
8694
/**
8795
* Queue of slice IDs that are the primary entry point for a new group of segments.
8896
* A driver should prioritize polling from this queue after failing to get a sequential
8997
* slice (the segment affinity). This ensures that threads start work on fresh,
9098
* independent segment groups before resorting to work stealing.
9199
*/
92-
private final Queue<Integer> sliceHeads;
100+
private final Queue<Integer> segmentHeads;
93101

94102
/**
95103
* Queue of slice IDs that are not the primary entry point for a segment group.
@@ -106,11 +114,14 @@ public record QueryAndTags(Query query, List<Object> tags) {}
106114
slices.set(i, sliceList.get(i));
107115
}
108116
this.partitioningStrategies = partitioningStrategies;
109-
this.sliceHeads = ConcurrentCollections.newQueue();
117+
this.queryHeads = ConcurrentCollections.newQueue();
118+
this.segmentHeads = ConcurrentCollections.newQueue();
110119
this.stealableSlices = ConcurrentCollections.newQueue();
111120
for (LuceneSlice slice : sliceList) {
112-
if (slice.getLeaf(0).minDoc() == 0) {
113-
sliceHeads.add(slice.slicePosition());
121+
if (slice.queryHead()) {
122+
queryHeads.add(slice.slicePosition());
123+
} else if (slice.getLeaf(0).minDoc() == 0) {
124+
segmentHeads.add(slice.slicePosition());
114125
} else {
115126
stealableSlices.add(slice.slicePosition());
116127
}
@@ -120,12 +131,14 @@ public record QueryAndTags(Query query, List<Object> tags) {}
120131
/**
121132
* Retrieves the next available {@link LuceneSlice} for processing.
122133
* <p>
123-
* This method implements a three-tiered strategy to minimize the overhead of switching between segments:
134+
* This method implements a four-tiered strategy to minimize the overhead of switching between queries/segments:
124135
* 1. If a previous slice is provided, it first attempts to return the next sequential slice.
125-
* This keeps a thread working on the same segments, minimizing the overhead of segment switching.
126-
* 2. If affinity fails, it returns a slice from the {@link #sliceHeads} queue, which is an entry point for
127-
* a new, independent group of segments, allowing the calling Driver to work on a fresh set of segments.
128-
* 3. If the {@link #sliceHeads} queue is exhausted, it "steals" a slice
136+
* This keeps a thread working on the same query and same segment, minimizing the overhead of query/segment switching.
137+
* 2. If affinity fails, it returns a slice from the {@link #queryHeads} queue, which is an entry point for
138+
* a new query, allowing the calling Driver to work on a fresh query with a new set of segments.
139+
* 3. If the {@link #queryHeads} queue is exhausted, it returns a slice from the {@link #segmentHeads} queue of other queries,
140+
* which is an entry point for a new, independent group of segments, allowing the calling Driver to work on a fresh set of segments.
141+
* 4. If the {@link #segmentHeads} queue is exhausted, it "steals" a slice
129142
* from the {@link #stealableSlices} queue. This fallback ensures all threads remain utilized.
130143
*
131144
* @param prev the previously returned {@link LuceneSlice}, or {@code null} if starting
@@ -142,7 +155,7 @@ public LuceneSlice nextSlice(LuceneSlice prev) {
142155
}
143156
}
144157
}
145-
for (var ids : List.of(sliceHeads, stealableSlices)) {
158+
for (var ids : List.of(queryHeads, segmentHeads, stealableSlices)) {
146159
Integer nextId;
147160
while ((nextId = ids.poll()) != null) {
148161
var slice = slices.getAndSet(nextId, null);
@@ -209,9 +222,12 @@ public static LuceneSliceQueue create(
209222
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
210223
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
211224
Weight weight = weight(ctx, query, scoreMode);
225+
boolean queryHead = true;
212226
for (List<PartialLeafReaderContext> group : groups) {
213227
if (group.isEmpty() == false) {
214-
slices.add(new LuceneSlice(nextSliceId++, ctx, group, weight, queryAndExtra.tags));
228+
final int slicePosition = nextSliceId++;
229+
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, weight, queryAndExtra.tags));
230+
queryHead = false;
215231
}
216232
}
217233
}

0 commit comments

Comments
 (0)