diff --git a/docs/changelog/133446.yaml b/docs/changelog/133446.yaml deleted file mode 100644 index 19514c5de6761..0000000000000 --- a/docs/changelog/133446.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 133446 -summary: Do not share Weight between Drivers -area: ES|QL -type: bug -issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 337f8a8eb905e..139106b8c1102 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -165,61 +165,40 @@ public final void close() { protected void additionalClose() { /* Override this method to add any additional cleanup logic if needed */ } LuceneScorer getCurrentOrLoadNextScorer() { - while (true) { - while (currentScorer == null || currentScorer.isDone()) { - var partialLeaf = nextPartialLeaf(); - if (partialLeaf == null) { - assert doneCollecting; + while (currentScorer == null || currentScorer.isDone()) { + if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) { + sliceIndex = 0; + currentSlice = sliceQueue.nextSlice(currentSlice); + if (currentSlice == null) { + doneCollecting = true; return null; } - logger.trace("Starting {}", partialLeaf); - loadScorerForNewPartialLeaf(partialLeaf); + processedSlices++; + processedShards.add(currentSlice.shardContext().shardIdentifier()); + int shardId = currentSlice.shardContext().index(); + if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) { + currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId)); + } } - // Has the executing thread changed? If so, we need to reinitialize the scorer. The reinitialized bulkScorer - // can be null even if it was non-null previously, due to lazy initialization in Weight#bulkScorer. - // Hence, we need to check the previous condition again. - if (currentScorer.executingThread == Thread.currentThread()) { - return currentScorer; - } else { - currentScorer.reinitialize(); + final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++); + logger.trace("Starting {}", partialLeaf); + final LeafReaderContext leaf = partialLeaf.leafReaderContext(); + if (currentScorer == null // First time + || currentScorer.leafReaderContext() != leaf // Moved to a new leaf + || currentScorer.weight != currentSlice.weight() // Moved to a new query + ) { + final Weight weight = currentSlice.weight(); + processedQueries.add(weight.getQuery()); + currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf); } + assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc(); + currentScorer.maxPosition = partialLeaf.maxDoc(); + currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc()); } - } - - private PartialLeafReaderContext nextPartialLeaf() { - if (currentSlice == null || sliceIndex >= currentSlice.numLeaves()) { - sliceIndex = 0; - currentSlice = sliceQueue.nextSlice(currentSlice); - if (currentSlice == null) { - doneCollecting = true; - return null; - } - processedSlices++; - int shardId = currentSlice.shardContext().index(); - if (currentScorerShardRefCounted == null || currentScorerShardRefCounted.index() != shardId) { - currentScorerShardRefCounted = new ShardRefCounted.Single(shardId, shardContextCounters.get(shardId)); - } - processedShards.add(currentSlice.shardContext().shardIdentifier()); + if (Thread.currentThread() != currentScorer.executingThread) { + currentScorer.reinitialize(); } - return currentSlice.getLeaf(sliceIndex++); - } - - private void loadScorerForNewPartialLeaf(PartialLeafReaderContext partialLeaf) { - final LeafReaderContext leaf = partialLeaf.leafReaderContext(); - if (currentScorer != null - && currentScorer.query() == currentSlice.query() - && currentScorer.shardContext == currentSlice.shardContext()) { - if (currentScorer.leafReaderContext != leaf) { - currentScorer = new LuceneScorer(currentSlice.shardContext(), currentScorer.weight, currentSlice.queryAndTags(), leaf); - } - } else { - final var weight = currentSlice.createWeight(); - currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.queryAndTags(), leaf); - processedQueries.add(currentScorer.query()); - } - assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc(); - currentScorer.maxPosition = partialLeaf.maxDoc(); - currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc()); + return currentScorer; } /** @@ -235,23 +214,18 @@ ShardRefCounted currentScorerShardRefCounted() { static final class LuceneScorer { private final ShardContext shardContext; private final Weight weight; - private final LuceneSliceQueue.QueryAndTags queryAndTags; private final LeafReaderContext leafReaderContext; + private final List tags; private BulkScorer bulkScorer; private int position; private int maxPosition; private Thread executingThread; - LuceneScorer( - ShardContext shardContext, - Weight weight, - LuceneSliceQueue.QueryAndTags queryAndTags, - LeafReaderContext leafReaderContext - ) { + LuceneScorer(ShardContext shardContext, Weight weight, List tags, LeafReaderContext leafReaderContext) { this.shardContext = shardContext; this.weight = weight; - this.queryAndTags = queryAndTags; + this.tags = tags; this.leafReaderContext = leafReaderContext; reinitialize(); } @@ -301,11 +275,7 @@ int position() { * Tags to add to the data returned by this query. */ List tags() { - return queryAndTags.tags(); - } - - Query query() { - return queryAndTags.query(); + return tags; } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java index 08d72564228af..38d2d87e3c750 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java @@ -7,12 +7,8 @@ package org.elasticsearch.compute.lucene; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; /** @@ -23,10 +19,9 @@ public record LuceneSlice( boolean queryHead, ShardContext shardContext, List leaves, - ScoreMode scoreMode, - LuceneSliceQueue.QueryAndTags queryAndTags + Weight weight, + List tags ) { - int numLeaves() { return leaves.size(); } @@ -34,21 +29,4 @@ int numLeaves() { PartialLeafReaderContext getLeaf(int index) { return leaves.get(index); } - - Query query() { - return queryAndTags.query(); - } - - List tags() { - return queryAndTags.tags(); - } - - Weight createWeight() { - var searcher = shardContext.searcher(); - try { - return searcher.createWeight(queryAndTags.query(), scoreMode, 1); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 02e050b550ffd..89c4ad2ed993d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -208,12 +209,12 @@ public static LuceneSliceQueue create( PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query); partitioningStrategies.put(ctx.shardIdentifier(), partitioning); List> groups = partitioning.groups(ctx.searcher(), taskConcurrency); - var rewrittenQueryAndTag = new QueryAndTags(query, queryAndExtra.tags); + Weight weight = weight(ctx, query, scoreMode); boolean queryHead = true; for (List group : groups) { if (group.isEmpty() == false) { final int slicePosition = nextSliceId++; - slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, scoreMode, rewrittenQueryAndTag)); + slices.add(new LuceneSlice(slicePosition, queryHead, ctx, group, weight, queryAndExtra.tags)); queryHead = false; } } @@ -315,6 +316,16 @@ private static PartitioningStrategy forAuto(Function query1 = List.of("1"); + List query2 = List.of("q2"); List sliceList = List.of( // query1: new segment - new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), scoreMode, t1), - new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), scoreMode, t1), - new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), scoreMode, t1), + new LuceneSlice(0, true, null, List.of(new PartialLeafReaderContext(leaf1, 0, 10)), null, query1), + new LuceneSlice(1, false, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query1), + new LuceneSlice(2, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query1), // query1: new segment - new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), scoreMode, t1), - new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), scoreMode, t1), - new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), scoreMode, t1), + new LuceneSlice(3, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query1), + new LuceneSlice(4, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query1), + new LuceneSlice(5, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query1), // query1: new segment - new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), scoreMode, t1), - new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), scoreMode, t1), + new LuceneSlice(6, false, null, List.of(new PartialLeafReaderContext(leaf4, 0, 10)), null, query1), + new LuceneSlice(7, false, null, List.of(new PartialLeafReaderContext(leaf4, 10, 20)), null, query1), // query2: new segment - new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), scoreMode, t2), - new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), scoreMode, t2), + new LuceneSlice(8, true, null, List.of(new PartialLeafReaderContext(leaf2, 0, 10)), null, query2), + new LuceneSlice(9, false, null, List.of(new PartialLeafReaderContext(leaf2, 10, 20)), null, query2), // query1: new segment - new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), scoreMode, t2), - new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), scoreMode, t2), - new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), scoreMode, t2) + new LuceneSlice(10, false, null, List.of(new PartialLeafReaderContext(leaf3, 0, 20)), null, query2), + new LuceneSlice(11, false, null, List.of(new PartialLeafReaderContext(leaf3, 10, 20)), null, query2), + new LuceneSlice(12, false, null, List.of(new PartialLeafReaderContext(leaf3, 20, 30)), null, query2) ); // single driver { @@ -142,7 +139,7 @@ public void testRandom() throws Exception { false, mock(ShardContext.class), List.of(new PartialLeafReaderContext(leafContext, minDoc, maxDoc)), - ScoreMode.COMPLETE_NO_SCORES, + null, null ); sliceList.add(slice); diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java index b4678464ee4d3..b4e34befa1dbc 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java @@ -367,10 +367,12 @@ private void testPushQuery( matchesList().item(matchesMap().entry("name", "test").entry("type", anyOf(equalTo("text"), equalTo("keyword")))), equalTo(found ? List.of(List.of(value)) : List.of()) ); - Matcher luceneQueryMatcher = anyOf(() -> Iterators.map(luceneQueryOptions.iterator(), (String s) -> { - String q = s.replaceAll("%value", value).replaceAll("%different_value", differentValue); - return equalTo("ConstantScore(" + q + ")"); - })); + Matcher luceneQueryMatcher = anyOf( + () -> Iterators.map( + luceneQueryOptions.iterator(), + (String s) -> equalTo(s.replaceAll("%value", value).replaceAll("%different_value", differentValue)) + ) + ); @SuppressWarnings("unchecked") List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 4390d730fcae3..458fffaa7f2b7 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -912,7 +912,7 @@ private String checkOperatorProfile(Map o) { .entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("process_nanos", greaterThan(0)) - .entry("processed_queries", List.of("ConstantScore(*:*)")) + .entry("processed_queries", List.of("*:*")) .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); case "ValuesSourceReaderOperator" -> basicProfile().entry("pages_received", greaterThan(0)) .entry("pages_emitted", greaterThan(0)) @@ -950,7 +950,7 @@ private String checkOperatorProfile(Map o) { .entry("slice_max", 0) .entry("slice_min", 0) .entry("process_nanos", greaterThan(0)) - .entry("processed_queries", List.of("ConstantScore(*:*)")) + .entry("processed_queries", List.of("*:*")) .entry("slice_index", 0); default -> throw new AssertionError("unexpected status: " + o); }; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 0582508aecbe6..5d3586b689832 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -111,7 +111,7 @@ public void testTaskContents() throws Exception { assertThat(description, equalTo("data")); LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices())); - assertThat(oStatus.processedQueries(), equalTo(Set.of("ConstantScore(*:*)"))); + assertThat(oStatus.processedQueries(), equalTo(Set.of("*:*"))); assertThat(oStatus.processedShards(), equalTo(Set.of("test:0"))); assertThat(oStatus.sliceIndex(), lessThanOrEqualTo(oStatus.totalSlices())); assertThat(oStatus.sliceMin(), greaterThanOrEqualTo(0));