Skip to content

Commit 2f10065

Browse files
authored
Do not share Weight between Drivers (#133446)
We have encountered the following error in serverless: ``` java.lang.NullPointerException: Cannot invoke \"org.apache.lucene.search.BulkScorer.score(org.apache.lucene.search.LeafCollector, org.apache.lucene.util.Bits, int, int)\" because \"this.bulkScorer\" is null at org.elasticsearch.compute.lucene.LuceneOperator$LuceneScorer.scoreNextRange(LuceneOperator.java:233) at org.elasticsearch.compute.lucene.LuceneSourceOperator.getCheckedOutput(LuceneSourceOperator.java:307) at org.elasticsearch.compute.lucene.LuceneOperator.getOutput(LuceneOperator.java:143) at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:272) at org.elasticsearch.compute.operator.Driver.run(Driver.java:186) at org.elasticsearch.compute.operator.Driver$1.doRun(Driver.java:420) ``` I spent considerable time trying to reproduce this issue but was unsuccessful, although I understand how it could occur. Weight should not be shared between threads. Most Weight implementations are safe to share, but those for term queries (e.g., TermQuery, multi-term queries) are not, as they contain mutable This change proposes to stop sharing Weight between Drivers.
1 parent f7e38b7 commit 2f10065

File tree

9 files changed

+120
-73
lines changed

9 files changed

+120
-73
lines changed

docs/changelog/133446.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133446
2+
summary: Do not share Weight between Drivers
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -165,40 +165,61 @@ public final void close() {
165165
protected void additionalClose() { /* Override this method to add any additional cleanup logic if needed */ }
166166

167167
LuceneScorer getCurrentOrLoadNextScorer() {
168-
while (currentScorer == null || currentScorer.isDone()) {
169-
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
170-
sliceIndex = 0;
171-
currentSlice = sliceQueue.nextSlice(currentSlice);
172-
if (currentSlice == null) {
173-
doneCollecting = true;
168+
while (true) {
169+
while (currentScorer == null || currentScorer.isDone()) {
170+
var partialLeaf = nextPartialLeaf();
171+
if (partialLeaf == null) {
172+
assert doneCollecting;
174173
return null;
175174
}
176-
processedSlices++;
177-
processedShards.add(currentSlice.shardContext().shardIdentifier());
178-
int shardId = currentSlice.shardContext().index();
179-
if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) {
180-
currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId));
181-
}
175+
logger.trace("Starting {}", partialLeaf);
176+
loadScorerForNewPartialLeaf(partialLeaf);
182177
}
183-
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
184-
logger.trace("Starting {}", partialLeaf);
185-
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
186-
if (currentScorer == null // First time
187-
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
188-
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
189-
) {
190-
final Weight weight = currentSlice.weight();
191-
processedQueries.add(weight.getQuery());
192-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
178+
// Has the executing thread changed? If so, we need to reinitialize the scorer. The reinitialized bulkScorer
179+
// can be null even if it was non-null previously, due to lazy initialization in Weight#bulkScorer.
180+
// Hence, we need to check the previous condition again.
181+
if (currentScorer.executingThread == Thread.currentThread()) {
182+
return currentScorer;
183+
} else {
184+
currentScorer.reinitialize();
193185
}
194-
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
195-
currentScorer.maxPosition = partialLeaf.maxDoc();
196-
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
197186
}
198-
if (Thread.currentThread() != currentScorer.executingThread) {
199-
currentScorer.reinitialize();
187+
}
188+
189+
private PartialLeafReaderContext nextPartialLeaf() {
190+
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
191+
sliceIndex = 0;
192+
currentSlice = sliceQueue.nextSlice(currentSlice);
193+
if (currentSlice == null) {
194+
doneCollecting = true;
195+
return null;
196+
}
197+
processedSlices++;
198+
int shardId = currentSlice.shardContext().index();
199+
if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) {
200+
currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId));
201+
}
202+
processedShards.add(currentSlice.shardContext().shardIdentifier());
200203
}
201-
return currentScorer;
204+
return currentSlice.getLeaf(sliceIndex++);
205+
}
206+
207+
private void loadScorerForNewPartialLeaf(PartialLeafReaderContext partialLeaf) {
208+
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
209+
if (currentScorer != null
210+
&& currentScorer.query() == currentSlice.query()
211+
&& currentScorer.shardContext == currentSlice.shardContext()) {
212+
if (currentScorer.leafReaderContext != leaf) {
213+
currentScorer = new LuceneScorer(currentSlice.shardContext(), currentScorer.weight, currentSlice.queryAndTags(), leaf);
214+
}
215+
} else {
216+
final var weight = currentSlice.createWeight();
217+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.queryAndTags(), leaf);
218+
processedQueries.add(currentScorer.query());
219+
}
220+
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
221+
currentScorer.maxPosition = partialLeaf.maxDoc();
222+
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
202223
}
203224

204225
/**
@@ -214,18 +235,23 @@ ShardRefCounted currentScorerShardRefCounted() {
214235
static final class LuceneScorer {
215236
private final ShardContext shardContext;
216237
private final Weight weight;
238+
private final LuceneSliceQueue.QueryAndTags queryAndTags;
217239
private final LeafReaderContext leafReaderContext;
218-
private final List<Object> tags;
219240

220241
private BulkScorer bulkScorer;
221242
private int position;
222243
private int maxPosition;
223244
private Thread executingThread;
224245

225-
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
246+
LuceneScorer(
247+
ShardContext shardContext,
248+
Weight weight,
249+
LuceneSliceQueue.QueryAndTags queryAndTags,
250+
LeafReaderContext leafReaderContext
251+
) {
226252
this.shardContext = shardContext;
227253
this.weight = weight;
228-
this.tags = tags;
254+
this.queryAndTags = queryAndTags;
229255
this.leafReaderContext = leafReaderContext;
230256
reinitialize();
231257
}
@@ -275,7 +301,11 @@ int position() {
275301
* Tags to add to the data returned by this query.
276302
*/
277303
List<Object> tags() {
278-
return tags;
304+
return queryAndTags.tags();
305+
}
306+
307+
Query query() {
308+
return queryAndTags.query();
279309
}
280310
}
281311

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77

88
package org.elasticsearch.compute.lucene;
99

10+
import org.apache.lucene.search.Query;
11+
import org.apache.lucene.search.ScoreMode;
1012
import org.apache.lucene.search.Weight;
1113

14+
import java.io.IOException;
15+
import java.io.UncheckedIOException;
1216
import java.util.List;
1317

1418
/**
@@ -19,14 +23,32 @@ public record LuceneSlice(
1923
boolean queryHead,
2024
ShardContext shardContext,
2125
List<PartialLeafReaderContext> leaves,
22-
Weight weight,
23-
List<Object> tags
26+
ScoreMode scoreMode,
27+
LuceneSliceQueue.QueryAndTags queryAndTags
2428
) {
29+
2530
int numLeaves() {
2631
return leaves.size();
2732
}
2833

2934
PartialLeafReaderContext getLeaf(int index) {
3035
return leaves.get(index);
3136
}
37+
38+
Query query() {
39+
return queryAndTags.query();
40+
}
41+
42+
List<Object> tags() {
43+
return queryAndTags.tags();
44+
}
45+
46+
Weight createWeight() {
47+
var searcher = shardContext.searcher();
48+
try {
49+
return searcher.createWeight(queryAndTags.query(), scoreMode, 1);
50+
} catch (IOException e) {
51+
throw new UncheckedIOException(e);
52+
}
53+
}
3254
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.lucene.search.IndexSearcher;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.ScoreMode;
15-
import org.apache.lucene.search.Weight;
1615
import org.elasticsearch.common.io.stream.StreamInput;
1716
import org.elasticsearch.common.io.stream.StreamOutput;
1817
import org.elasticsearch.common.io.stream.Writeable;
@@ -209,12 +208,12 @@ public static LuceneSliceQueue create(
209208
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
210209
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
211210
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
212-
Weight weight = weight(ctx, query, scoreMode);
211+
var rewrittenQueryAndTag = new QueryAndTags(query, queryAndExtra.tags);
213212
boolean queryHead = true;
214213
for (List<PartialLeafReaderContext> group : groups) {
215214
if (group.isEmpty() == false) {
216215
final int slicePosition = nextSliceId++;
217-
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, weight, queryAndExtra.tags));
216+
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, scoreMode, rewrittenQueryAndTag));
218217
queryHead = false;
219218
}
220219
}
@@ -316,16 +315,6 @@ private static PartitioningStrategy forAuto(Function<Query, PartitioningStrategy
316315
}
317316
}
318317

319-
static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) {
320-
var searcher = ctx.searcher();
321-
try {
322-
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
323-
return searcher.createWeight(actualQuery, scoreMode, 1);
324-
} catch (IOException e) {
325-
throw new UncheckedIOException(e);
326-
}
327-
}
328-
329318
static final class AdaptivePartitioner {
330319
final int desiredDocsPerSlice;
331320
final int maxDocsPerSlice;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ protected boolean lessThan(LeafIterator a, LeafIterator b) {
154154
return a.timeSeriesHash.compareTo(b.timeSeriesHash) < 0;
155155
}
156156
};
157-
Weight weight = luceneSlice.weight();
157+
Weight weight = luceneSlice.createWeight();
158158
processedQueries.add(weight.getQuery());
159159
int maxSegmentOrd = 0;
160160
for (var leafReaderContext : luceneSlice.leaves()) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSliceQueueTests.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.lucene.index.TermVectors;
2525
import org.apache.lucene.index.Terms;
2626
import org.apache.lucene.search.KnnCollector;
27+
import org.apache.lucene.search.MatchAllDocsQuery;
28+
import org.apache.lucene.search.ScoreMode;
2729
import org.apache.lucene.util.Bits;
2830
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2931
import org.elasticsearch.test.ESTestCase;
@@ -50,27 +52,28 @@ public void testBasics() {
5052
LeafReaderContext leaf2 = new MockLeafReader(1000).getContext();
5153
LeafReaderContext leaf3 = new MockLeafReader(1000).getContext();
5254
LeafReaderContext leaf4 = new MockLeafReader(1000).getContext();
53-
List<Object> query1 = List.of("1");
54-
List<Object> query2 = List.of("q2");
55+
LuceneSliceQueue.QueryAndTags t1 = new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of("q1"));
56+
LuceneSliceQueue.QueryAndTags t2 = new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of("q2"));
57+
var scoreMode = ScoreMode.COMPLETE_NO_SCORES;
5558
List<LuceneSlice> sliceList = List.of(
5659
// query1: new segment
57-
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, query1),
58-
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query1),
59-
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query1),
60+
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), scoreMode, t1),
61+
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), scoreMode, t1),
62+
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), scoreMode, t1),
6063
// query1: new segment
61-
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query1),
62-
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query1),
63-
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query1),
64+
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), scoreMode, t1),
65+
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), scoreMode, t1),
66+
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), scoreMode, t1),
6467
// query1: new segment
65-
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, query1),
66-
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, query1),
68+
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), scoreMode, t1),
69+
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), scoreMode, t1),
6770
// query2: new segment
68-
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query2),
69-
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query2),
71+
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), scoreMode, t2),
72+
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), scoreMode, t2),
7073
// query1: new segment
71-
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query2),
72-
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query2),
73-
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query2)
74+
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), scoreMode, t2),
75+
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), scoreMode, t2),
76+
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), scoreMode, t2)
7477
);
7578
// single driver
7679
{
@@ -139,7 +142,7 @@ public void testRandom() throws Exception {
139142
false,
140143
mock(ShardContext.class),
141144
List.of(new PartialLeafReaderContext(leafContext, minDoc, maxDoc)),
142-
null,
145+
ScoreMode.COMPLETE_NO_SCORES,
143146
null
144147
);
145148
sliceList.add(slice);

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -367,12 +367,10 @@ private void testPushQuery(
367367
matchesList().item(matchesMap().entry("name", "test").entry("type", anyOf(equalTo("text"), equalTo("keyword")))),
368368
equalTo(found ? List.of(List.of(value)) : List.of())
369369
);
370-
Matcher<String> luceneQueryMatcher = anyOf(
371-
() -> Iterators.map(
372-
luceneQueryOptions.iterator(),
373-
(String s) -> equalTo(s.replaceAll("%value", value).replaceAll("%different_value", differentValue))
374-
)
375-
);
370+
Matcher<String> luceneQueryMatcher = anyOf(() -> Iterators.map(luceneQueryOptions.iterator(), (String s) -> {
371+
String q = s.replaceAll("%value", value).replaceAll("%different_value", differentValue);
372+
return equalTo("ConstantScore(" + q + ")");
373+
}));
376374

377375
@SuppressWarnings("unchecked")
378376
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ private String checkOperatorProfile(Map<String, Object> o) {
912912
.entry("pages_emitted", greaterThan(0))
913913
.entry("rows_emitted", greaterThan(0))
914914
.entry("process_nanos", greaterThan(0))
915-
.entry("processed_queries", List.of("*:*"))
915+
.entry("processed_queries", List.of("ConstantScore(*:*)"))
916916
.entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
917917
case "ValuesSourceReaderOperator" -> basicProfile().entry("pages_received", greaterThan(0))
918918
.entry("pages_emitted", greaterThan(0))
@@ -950,7 +950,7 @@ private String checkOperatorProfile(Map<String, Object> o) {
950950
.entry("slice_max", 0)
951951
.entry("slice_min", 0)
952952
.entry("process_nanos", greaterThan(0))
953-
.entry("processed_queries", List.of("*:*"))
953+
.entry("processed_queries", List.of("ConstantScore(*:*)"))
954954
.entry("slice_index", 0);
955955
default -> throw new AssertionError("unexpected status: " + o);
956956
};

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void testTaskContents() throws Exception {
111111
assertThat(description, equalTo("data"));
112112
LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status();
113113
assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices()));
114-
assertThat(oStatus.processedQueries(), equalTo(Set.of("*:*")));
114+
assertThat(oStatus.processedQueries(), equalTo(Set.of("ConstantScore(*:*)")));
115115
assertThat(oStatus.processedShards(), equalTo(Set.of("test:0")));
116116
assertThat(oStatus.sliceIndex(), lessThanOrEqualTo(oStatus.totalSlices()));
117117
assertThat(oStatus.sliceMin(), greaterThanOrEqualTo(0));

0 commit comments

Comments
 (0)