Skip to content

Commit b71dfb0

Browse files
committed
Do not share Weight between Drivers
1 parent 42a6c13 commit b71dfb0

File tree

5 files changed

+89
-54
lines changed

5 files changed

+89
-54
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -162,36 +162,53 @@ public final void close() {
162162
protected void additionalClose() { /* Override this method to add any additional cleanup logic if needed */ }
163163

164164
LuceneScorer getCurrentOrLoadNextScorer() {
165-
while (currentScorer == null || currentScorer.isDone()) {
166-
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
167-
sliceIndex = 0;
168-
currentSlice = sliceQueue.nextSlice(currentSlice);
169-
if (currentSlice == null) {
170-
doneCollecting = true;
165+
while (true) {
166+
while (currentScorer == null || currentScorer.isDone()) {
167+
var partialLeaf = nextPartialLeaf();
168+
if (partialLeaf == null) {
169+
assert doneCollecting;
171170
return null;
172171
}
173-
processedSlices++;
174-
processedShards.add(currentSlice.shardContext().shardIdentifier());
172+
logger.trace("Starting {}", partialLeaf);
173+
loadScorerForNewPartialLeaf(partialLeaf);
175174
}
176-
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
177-
logger.trace("Starting {}", partialLeaf);
178-
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
179-
if (currentScorer == null // First time
180-
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
181-
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
182-
) {
183-
final Weight weight = currentSlice.weight();
184-
processedQueries.add(weight.getQuery());
185-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
175+
if (currentScorer.executingThread == Thread.currentThread()) {
176+
return currentScorer;
177+
} else {
178+
currentScorer.reinitialize();
186179
}
187-
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
188-
currentScorer.maxPosition = partialLeaf.maxDoc();
189-
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
190180
}
191-
if (Thread.currentThread() != currentScorer.executingThread) {
192-
currentScorer.reinitialize();
181+
}
182+
183+
private PartialLeafReaderContext nextPartialLeaf() {
184+
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
185+
sliceIndex = 0;
186+
currentSlice = sliceQueue.nextSlice(currentSlice);
187+
if (currentSlice == null) {
188+
doneCollecting = true;
189+
return null;
190+
}
191+
processedSlices++;
192+
processedShards.add(currentSlice.shardContext().shardIdentifier());
193193
}
194-
return currentScorer;
194+
return currentSlice.getLeaf(sliceIndex++);
195+
}
196+
197+
private void loadScorerForNewPartialLeaf(PartialLeafReaderContext partialLeaf) {
198+
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
199+
// the current Weight can be reused with the next slice
200+
if (currentScorer != null && currentSlice.isWeightCompatible(currentScorer.weight)) {
201+
if (currentScorer.leafReaderContext != leaf) {
202+
currentScorer = new LuceneScorer(currentSlice.shardContext(), currentScorer.weight, currentSlice.tags(), leaf);
203+
}
204+
} else {
205+
final var weight = currentSlice.createWeight();
206+
processedQueries.add(weight.getQuery());
207+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
208+
}
209+
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
210+
currentScorer.maxPosition = partialLeaf.maxDoc();
211+
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
195212
}
196213

197214
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,13 @@
77

88
package org.elasticsearch.compute.lucene;
99

10+
import org.apache.lucene.search.FilterWeight;
11+
import org.apache.lucene.search.Query;
12+
import org.apache.lucene.search.ScoreMode;
1013
import org.apache.lucene.search.Weight;
1114

15+
import java.io.IOException;
16+
import java.io.UncheckedIOException;
1217
import java.util.List;
1318

1419
/**
@@ -19,14 +24,36 @@ public record LuceneSlice(
1924
boolean queryHead,
2025
ShardContext shardContext,
2126
List<PartialLeafReaderContext> leaves,
22-
Weight weight,
27+
Query query,
28+
ScoreMode scoreMode,
2329
List<Object> tags
2430
) {
31+
2532
int numLeaves() {
2633
return leaves.size();
2734
}
2835

2936
PartialLeafReaderContext getLeaf(int index) {
3037
return leaves.get(index);
3138
}
39+
40+
Weight createWeight() {
41+
var searcher = shardContext.searcher();
42+
try {
43+
Weight w = searcher.createWeight(query, scoreMode, 1);
44+
return new OwningWeight(query, w);
45+
} catch (IOException e) {
46+
throw new UncheckedIOException(e);
47+
}
48+
}
49+
50+
private static class OwningWeight extends FilterWeight {
51+
protected OwningWeight(Query query, Weight weight) {
52+
super(query, weight);
53+
}
54+
}
55+
56+
boolean isWeightCompatible(Weight weight) {
57+
return weight instanceof OwningWeight ow && ow.getQuery() == query;
58+
}
3259
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.lucene.search.IndexSearcher;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.ScoreMode;
15-
import org.apache.lucene.search.Weight;
1615
import org.elasticsearch.common.io.stream.StreamInput;
1716
import org.elasticsearch.common.io.stream.StreamOutput;
1817
import org.elasticsearch.common.io.stream.Writeable;
@@ -221,12 +220,11 @@ public static LuceneSliceQueue create(
221220
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
222221
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
223222
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
224-
Weight weight = weight(ctx, query, scoreMode);
225223
boolean queryHead = true;
226224
for (List<PartialLeafReaderContext> group : groups) {
227225
if (group.isEmpty() == false) {
228226
final int slicePosition = nextSliceId++;
229-
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, weight, queryAndExtra.tags));
227+
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, query, scoreMode, queryAndExtra.tags));
230228
queryHead = false;
231229
}
232230
}
@@ -328,16 +326,6 @@ private static PartitioningStrategy forAuto(Function<Query, PartitioningStrategy
328326
}
329327
}
330328

331-
static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) {
332-
var searcher = ctx.searcher();
333-
try {
334-
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
335-
return searcher.createWeight(actualQuery, scoreMode, 1);
336-
} catch (IOException e) {
337-
throw new UncheckedIOException(e);
338-
}
339-
}
340-
341329
static final class AdaptivePartitioner {
342330
final int desiredDocsPerSlice;
343331
final int maxDocsPerSlice;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) {
154154
return a.timeSeriesHash.compareTo(b.timeSeriesHash) < 0;
155155
}
156156
};
157-
Weight weight = luceneSlice.weight();
157+
Weight weight = luceneSlice.createWeight();
158158
processedQueries.add(weight.getQuery());
159159
int maxSegmentOrd = 0;
160160
for (var leafReaderContext : luceneSlice.leaves()) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSliceQueueTests.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.lucene.index.TermVectors;
2525
import org.apache.lucene.index.Terms;
2626
import org.apache.lucene.search.KnnCollector;
27+
import org.apache.lucene.search.ScoreMode;
2728
import org.apache.lucene.util.Bits;
2829
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2930
import org.elasticsearch.test.ESTestCase;
@@ -50,27 +51,28 @@ public void testBasics() {
5051
LeafReaderContext leaf2 = new MockLeafReader(1000).getContext();
5152
LeafReaderContext leaf3 = new MockLeafReader(1000).getContext();
5253
LeafReaderContext leaf4 = new MockLeafReader(1000).getContext();
53-
List<Object> query1 = List.of("1");
54-
List<Object> query2 = List.of("q2");
54+
List<Object> t1 = List.of("1");
55+
List<Object> t2 = List.of("q2");
56+
var scoreMode = ScoreMode.COMPLETE_NO_SCORES;
5557
List<LuceneSlice> sliceList = List.of(
5658
// query1: new segment
57-
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, query1),
58-
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query1),
59-
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query1),
59+
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, scoreMode, t1),
60+
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, scoreMode, t1),
61+
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, scoreMode, t1),
6062
// query1: new segment
61-
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query1),
62-
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query1),
63-
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query1),
63+
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, scoreMode, t1),
64+
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, scoreMode, t1),
65+
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, scoreMode, t1),
6466
// query1: new segment
65-
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, query1),
66-
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, query1),
67+
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, scoreMode, t1),
68+
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, scoreMode, t1),
6769
// query2: new segment
68-
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query2),
69-
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query2),
70+
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, scoreMode, t2),
71+
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, scoreMode, t2),
7072
// query1: new segment
71-
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query2),
72-
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query2),
73-
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query2)
73+
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, scoreMode, t2),
74+
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, scoreMode, t2),
75+
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, scoreMode, t2)
7476
);
7577
// single driver
7678
{
@@ -140,6 +142,7 @@ public void testRandom() throws Exception {
140142
mock(ShardContext.class),
141143
List.of(new PartialLeafReaderContext(leafContext, minDoc, maxDoc)),
142144
null,
145+
ScoreMode.COMPLETE_NO_SCORES,
143146
null
144147
);
145148
sliceList.add(slice);

0 commit comments

Comments
 (0)