Skip to content

Commit f972730

Browse files
authored
Revert "Do not share Weight between Drivers (elastic#133446)" (elastic#134481)
This reverts commit 2f10065. We have seen a performance regression that may be caused by this fix in some queries. I am reverting the PR for now and will try to introduce a fix with fewer side effects. Relates elastic#133446
1 parent 7e8a69d commit f972730

File tree

8 files changed

+72
-119
lines changed

8 files changed

+72
-119
lines changed

docs/changelog/133446.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 32 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -165,61 +165,40 @@ public final void close() {
165165
protected void additionalClose() { /* Override this method to add any additional cleanup logic if needed */ }
166166

167167
LuceneScorer getCurrentOrLoadNextScorer() {
168-
while (true) {
169-
while (currentScorer == null || currentScorer.isDone()) {
170-
var partialLeaf = nextPartialLeaf();
171-
if (partialLeaf == null) {
172-
assert doneCollecting;
168+
while (currentScorer == null || currentScorer.isDone()) {
169+
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
170+
sliceIndex = 0;
171+
currentSlice = sliceQueue.nextSlice(currentSlice);
172+
if (currentSlice == null) {
173+
doneCollecting = true;
173174
return null;
174175
}
175-
logger.trace("Starting {}", partialLeaf);
176-
loadScorerForNewPartialLeaf(partialLeaf);
176+
processedSlices++;
177+
processedShards.add(currentSlice.shardContext().shardIdentifier());
178+
int shardId = currentSlice.shardContext().index();
179+
if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) {
180+
currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId));
181+
}
177182
}
178-
// Has the executing thread changed? If so, we need to reinitialize the scorer. The reinitialized bulkScorer
179-
// can be null even if it was non-null previously, due to lazy initialization in Weight#bulkScorer.
180-
// Hence, we need to check the previous condition again.
181-
if (currentScorer.executingThread == Thread.currentThread()) {
182-
return currentScorer;
183-
} else {
184-
currentScorer.reinitialize();
183+
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
184+
logger.trace("Starting {}", partialLeaf);
185+
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
186+
if (currentScorer == null // First time
187+
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
188+
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
189+
) {
190+
final Weight weight = currentSlice.weight();
191+
processedQueries.add(weight.getQuery());
192+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
185193
}
194+
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
195+
currentScorer.maxPosition = partialLeaf.maxDoc();
196+
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
186197
}
187-
}
188-
189-
private PartialLeafReaderContext nextPartialLeaf() {
190-
if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) {
191-
sliceIndex = 0;
192-
currentSlice = sliceQueue.nextSlice(currentSlice);
193-
if (currentSlice == null) {
194-
doneCollecting = true;
195-
return null;
196-
}
197-
processedSlices++;
198-
int shardId = currentSlice.shardContext().index();
199-
if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) {
200-
currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId));
201-
}
202-
processedShards.add(currentSlice.shardContext().shardIdentifier());
198+
if (Thread.currentThread() != currentScorer.executingThread) {
199+
currentScorer.reinitialize();
203200
}
204-
return currentSlice.getLeaf(sliceIndex++);
205-
}
206-
207-
private void loadScorerForNewPartialLeaf(PartialLeafReaderContext partialLeaf) {
208-
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
209-
if (currentScorer != null
210-
&& currentScorer.query() == currentSlice.query()
211-
&& currentScorer.shardContext == currentSlice.shardContext()) {
212-
if (currentScorer.leafReaderContext != leaf) {
213-
currentScorer = new LuceneScorer(currentSlice.shardContext(), currentScorer.weight, currentSlice.queryAndTags(), leaf);
214-
}
215-
} else {
216-
final var weight = currentSlice.createWeight();
217-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.queryAndTags(), leaf);
218-
processedQueries.add(currentScorer.query());
219-
}
220-
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
221-
currentScorer.maxPosition = partialLeaf.maxDoc();
222-
currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc());
201+
return currentScorer;
223202
}
224203

225204
/**
@@ -235,23 +214,18 @@ ShardRefCounted currentScorerShardRefCounted() {
235214
static final class LuceneScorer {
236215
private final ShardContext shardContext;
237216
private final Weight weight;
238-
private final LuceneSliceQueue.QueryAndTags queryAndTags;
239217
private final LeafReaderContext leafReaderContext;
218+
private final List<Object> tags;
240219

241220
private BulkScorer bulkScorer;
242221
private int position;
243222
private int maxPosition;
244223
private Thread executingThread;
245224

246-
LuceneScorer(
247-
ShardContext shardContext,
248-
Weight weight,
249-
LuceneSliceQueue.QueryAndTags queryAndTags,
250-
LeafReaderContext leafReaderContext
251-
) {
225+
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
252226
this.shardContext = shardContext;
253227
this.weight = weight;
254-
this.queryAndTags = queryAndTags;
228+
this.tags = tags;
255229
this.leafReaderContext = leafReaderContext;
256230
reinitialize();
257231
}
@@ -301,11 +275,7 @@ int position() {
301275
* Tags to add to the data returned by this query.
302276
*/
303277
List<Object> tags() {
304-
return queryAndTags.tags();
305-
}
306-
307-
Query query() {
308-
return queryAndTags.query();
278+
return tags;
309279
}
310280
}
311281

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77

88
package org.elasticsearch.compute.lucene;
99

10-
import org.apache.lucene.search.Query;
11-
import org.apache.lucene.search.ScoreMode;
1210
import org.apache.lucene.search.Weight;
1311

14-
import java.io.IOException;
15-
import java.io.UncheckedIOException;
1612
import java.util.List;
1713

1814
/**
@@ -23,32 +19,14 @@ public record LuceneSlice(
2319
boolean queryHead,
2420
ShardContext shardContext,
2521
List<PartialLeafReaderContext> leaves,
26-
ScoreMode scoreMode,
27-
LuceneSliceQueue.QueryAndTags queryAndTags
22+
Weight weight,
23+
List<Object> tags
2824
) {
29-
3025
int numLeaves() {
3126
return leaves.size();
3227
}
3328

3429
PartialLeafReaderContext getLeaf(int index) {
3530
return leaves.get(index);
3631
}
37-
38-
Query query() {
39-
return queryAndTags.query();
40-
}
41-
42-
List<Object> tags() {
43-
return queryAndTags.tags();
44-
}
45-
46-
Weight createWeight() {
47-
var searcher = shardContext.searcher();
48-
try {
49-
return searcher.createWeight(queryAndTags.query(), scoreMode, 1);
50-
} catch (IOException e) {
51-
throw new UncheckedIOException(e);
52-
}
53-
}
5432
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.search.IndexSearcher;
1313
import org.apache.lucene.search.Query;
1414
import org.apache.lucene.search.ScoreMode;
15+
import org.apache.lucene.search.Weight;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
1718
import org.elasticsearch.common.io.stream.Writeable;
@@ -208,12 +209,12 @@ public static LuceneSliceQueue create(
208209
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
209210
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
210211
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
211-
var rewrittenQueryAndTag = new QueryAndTags(query, queryAndExtra.tags);
212+
Weight weight = weight(ctx, query, scoreMode);
212213
boolean queryHead = true;
213214
for (List<PartialLeafReaderContext> group : groups) {
214215
if (group.isEmpty() == false) {
215216
final int slicePosition = nextSliceId++;
216-
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, scoreMode, rewrittenQueryAndTag));
217+
slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, weight, queryAndExtra.tags));
217218
queryHead = false;
218219
}
219220
}
@@ -315,6 +316,16 @@ private static PartitioningStrategy forAuto(Function<Query, PartitioningStrategy
315316
}
316317
}
317318

319+
static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) {
320+
var searcher = ctx.searcher();
321+
try {
322+
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
323+
return searcher.createWeight(actualQuery, scoreMode, 1);
324+
} catch (IOException e) {
325+
throw new UncheckedIOException(e);
326+
}
327+
}
328+
318329
static final class AdaptivePartitioner {
319330
final int desiredDocsPerSlice;
320331
final int maxDocsPerSlice;

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSliceQueueTests.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import org.apache.lucene.index.TermVectors;
2525
import org.apache.lucene.index.Terms;
2626
import org.apache.lucene.search.KnnCollector;
27-
import org.apache.lucene.search.MatchAllDocsQuery;
28-
import org.apache.lucene.search.ScoreMode;
2927
import org.apache.lucene.util.Bits;
3028
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3129
import org.elasticsearch.test.ESTestCase;
@@ -52,28 +50,27 @@ public void testBasics() {
5250
LeafReaderContext leaf2 = new MockLeafReader(1000).getContext();
5351
LeafReaderContext leaf3 = new MockLeafReader(1000).getContext();
5452
LeafReaderContext leaf4 = new MockLeafReader(1000).getContext();
55-
LuceneSliceQueue.QueryAndTags t1 = new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of("q1"));
56-
LuceneSliceQueue.QueryAndTags t2 = new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of("q2"));
57-
var scoreMode = ScoreMode.COMPLETE_NO_SCORES;
53+
List<Object> query1 = List.of("1");
54+
List<Object> query2 = List.of("q2");
5855
List<LuceneSlice> sliceList = List.of(
5956
// query1: new segment
60-
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), scoreMode, t1),
61-
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), scoreMode, t1),
62-
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), scoreMode, t1),
57+
new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, query1),
58+
new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query1),
59+
new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query1),
6360
// query1: new segment
64-
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), scoreMode, t1),
65-
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), scoreMode, t1),
66-
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), scoreMode, t1),
61+
new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query1),
62+
new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query1),
63+
new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query1),
6764
// query1: new segment
68-
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), scoreMode, t1),
69-
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), scoreMode, t1),
65+
new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, query1),
66+
new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, query1),
7067
// query2: new segment
71-
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), scoreMode, t2),
72-
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), scoreMode, t2),
68+
new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query2),
69+
new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query2),
7370
// query1: new segment
74-
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), scoreMode, t2),
75-
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), scoreMode, t2),
76-
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), scoreMode, t2)
71+
new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query2),
72+
new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query2),
73+
new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query2)
7774
);
7875
// single driver
7976
{
@@ -142,7 +139,7 @@ public void testRandom() throws Exception {
142139
false,
143140
mock(ShardContext.class),
144141
List.of(new PartialLeafReaderContext(leafContext, minDoc, maxDoc)),
145-
ScoreMode.COMPLETE_NO_SCORES,
142+
null,
146143
null
147144
);
148145
sliceList.add(slice);

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,10 +367,12 @@ private void testPushQuery(
367367
matchesList().item(matchesMap().entry("name", "test").entry("type", anyOf(equalTo("text"), equalTo("keyword")))),
368368
equalTo(found ? List.of(List.of(value)) : List.of())
369369
);
370-
Matcher<String> luceneQueryMatcher = anyOf(() -> Iterators.map(luceneQueryOptions.iterator(), (String s) -> {
371-
String q = s.replaceAll("%value", value).replaceAll("%different_value", differentValue);
372-
return equalTo("ConstantScore(" + q + ")");
373-
}));
370+
Matcher<String> luceneQueryMatcher = anyOf(
371+
() -> Iterators.map(
372+
luceneQueryOptions.iterator(),
373+
(String s) -> equalTo(s.replaceAll("%value", value).replaceAll("%different_value", differentValue))
374+
)
375+
);
374376

375377
@SuppressWarnings("unchecked")
376378
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ private String checkOperatorProfile(Map<String, Object> o) {
912912
.entry("pages_emitted", greaterThan(0))
913913
.entry("rows_emitted", greaterThan(0))
914914
.entry("process_nanos", greaterThan(0))
915-
.entry("processed_queries", List.of("ConstantScore(*:*)"))
915+
.entry("processed_queries", List.of("*:*"))
916916
.entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
917917
case "ValuesSourceReaderOperator" -> basicProfile().entry("pages_received", greaterThan(0))
918918
.entry("pages_emitted", greaterThan(0))
@@ -950,7 +950,7 @@ private String checkOperatorProfile(Map<String, Object> o) {
950950
.entry("slice_max", 0)
951951
.entry("slice_min", 0)
952952
.entry("process_nanos", greaterThan(0))
953-
.entry("processed_queries", List.of("ConstantScore(*:*)"))
953+
.entry("processed_queries", List.of("*:*"))
954954
.entry("slice_index", 0);
955955
default -> throw new AssertionError("unexpected status: " + o);
956956
};

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void testTaskContents() throws Exception {
111111
assertThat(description, equalTo("data"));
112112
LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status();
113113
assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices()));
114-
assertThat(oStatus.processedQueries(), equalTo(Set.of("ConstantScore(*:*)")));
114+
assertThat(oStatus.processedQueries(), equalTo(Set.of("*:*")));
115115
assertThat(oStatus.processedShards(), equalTo(Set.of("test:0")));
116116
assertThat(oStatus.sliceIndex(), lessThanOrEqualTo(oStatus.totalSlices()));
117117
assertThat(oStatus.sliceMin(), greaterThanOrEqualTo(0));

0 commit comments

Comments
 (0)