diff --git a/docs/reference/query-languages/esql/_snippets/functions/appendix/values.md b/docs/reference/query-languages/esql/_snippets/functions/appendix/values.md index 4a21c08c79756..f8928db80b968 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/appendix/values.md +++ b/docs/reference/query-languages/esql/_snippets/functions/appendix/values.md @@ -1,7 +1,8 @@ % This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. -::::{note} -Use [`TOP`](/reference/query-languages/esql/functions-operators/aggregation-functions.md#esql-top) if you need to keep repeated values. +::::{tip} +Use [`TOP`](/reference/query-languages/esql/functions-operators/aggregation-functions.md#esql-top) +if you need to keep repeated values. :::: ::::{warning} This can use a significant amount of memory and ES|QL doesn’t yet diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/values.json b/docs/reference/query-languages/esql/kibana/definition/functions/values.json index 7983f9d2b66ab..6ee883d6fd280 100644 --- a/docs/reference/query-languages/esql/kibana/definition/functions/values.json +++ b/docs/reference/query-languages/esql/kibana/definition/functions/values.json @@ -2,7 +2,7 @@ "comment" : "This is generated by ESQL’s AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.", "type" : "agg", "name" : "values", - "description" : "Returns unique values as a multivalued field. The order of the returned values isn’t guaranteed.\nIf you need the values returned in order use `MV_SORT`.", + "description" : "Returns unique values as a multivalued field. The order of the returned values isn’t guaranteed.\nIf you need the values returned in order use\n`MV_SORT`.", "signatures" : [ { "params" : [ diff --git a/docs/reference/query-languages/esql/kibana/docs/functions/values.md b/docs/reference/query-languages/esql/kibana/docs/functions/values.md index 130c7f3d60e20..3a95a12d03cd3 100644 --- a/docs/reference/query-languages/esql/kibana/docs/functions/values.md +++ b/docs/reference/query-languages/esql/kibana/docs/functions/values.md @@ -2,7 +2,8 @@ ### VALUES Returns unique values as a multivalued field. The order of the returned values isn’t guaranteed. -If you need the values returned in order use [`MV_SORT`](https://www.elastic.co/docs/reference/query-languages/esql/functions-operators/mv-functions#esql-mv_sort). +If you need the values returned in order use +[`MV_SORT`](https://www.elastic.co/docs/reference/query-languages/esql/functions-operators/mv-functions#esql-mv_sort). ```esql FROM employees diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java index 5fadf98a9d823..fb733e0cb0576 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java @@ -9,7 +9,6 @@ import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; @@ -44,7 +43,7 @@ public static class Factory extends LuceneOperator.Factory { public Factory( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, int taskConcurrency, int limit @@ -121,6 +120,9 @@ protected Page getCheckedOutput() throws IOException { if (scorer == null) { remainingDocs = 0; } else { + if (scorer.tags().isEmpty() == false) { + throw new UnsupportedOperationException("tags not supported by " + getClass()); + } Weight weight = scorer.weight(); var leafReaderContext = scorer.leafReaderContext(); // see org.apache.lucene.search.TotalHitCountCollector diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java index 5e91f2b80bcec..49a6471b3e708 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.compute.data.Block; @@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) { public LuceneMaxFactory( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, int taskConcurrency, String fieldName, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java index fc457ae196186..1abb2e7f8085a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.compute.data.Block; @@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) { public LuceneMinFactory( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, int taskConcurrency, String fieldName, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java index e9f540c654a22..d0b508f14025e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java @@ -102,6 +102,9 @@ public Page getCheckedOutput() throws IOException { if (scorer == null) { remainingDocs = 0; } else { + if (scorer.tags().isEmpty() == false) { + throw new UnsupportedOperationException("tags not supported by " + getClass()); + } final LeafReader reader = scorer.leafReaderContext().reader(); final Query query = scorer.weight().getQuery(); if (query == null || query instanceof MatchAllDocsQuery) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 099a81abe5a2e..0da3915c9ad0c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -97,7 +97,7 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac */ protected Factory( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, Function autoStrategy, int taskConcurrency, @@ -155,10 +155,13 @@ LuceneScorer getCurrentOrLoadNextScorer() { final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++); logger.trace("Starting {}", partialLeaf); final LeafReaderContext leaf = partialLeaf.leafReaderContext(); - if (currentScorer == null || currentScorer.leafReaderContext() != leaf) { + if (currentScorer == null // First time + || currentScorer.leafReaderContext() != leaf // Moved to a new leaf + || currentScorer.weight != currentSlice.weight() // Moved to a new query + ) { final Weight weight = currentSlice.weight(); processedQueries.add(weight.getQuery()); - currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf); + currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf); } assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc(); currentScorer.maxPosition = partialLeaf.maxDoc(); @@ -177,15 +180,17 @@ static final class LuceneScorer { private final ShardContext shardContext; private final Weight weight; private final LeafReaderContext leafReaderContext; + private final List tags; private BulkScorer bulkScorer; private int position; private int maxPosition; private Thread executingThread; - LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) { + LuceneScorer(ShardContext shardContext, Weight weight, List tags, LeafReaderContext leafReaderContext) { this.shardContext = shardContext; this.weight = weight; + this.tags = tags; this.leafReaderContext = leafReaderContext; reinitialize(); } @@ -230,6 +235,13 @@ Weight weight() { int position() { return position; } + + /** + * Tags to add to the data returned by this query. + */ + List tags() { + return tags; + } } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java index d7b6a86e07905..1fcbcd267ee4a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java @@ -14,7 +14,7 @@ /** * Holds a list of multiple partial Lucene segments */ -public record LuceneSlice(ShardContext shardContext, List leaves, Weight weight) { +public record LuceneSlice(ShardContext shardContext, List leaves, Weight weight, List tags) { int numLeaves() { return leaves.size(); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 4a8847e2870aa..ee9f217303195 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -32,8 +32,47 @@ /** * Shared Lucene slices between Lucene operators. + *

+ * Each shard is {@link #create built} with a list of queries to run and + * tags to add to the queries ({@code List}). Some examples: + *

+ *
    + *
  • + * For queries like {@code FROM foo} we'll use a one element list + * containing {@code match_all, []}. It loads all documents in the + * index and append no extra fields to the loaded documents. + *
  • + *
  • + * For queries like {@code FROM foo | WHERE a > 10} we'll use a one + * element list containing {@code +single_value(a) +(a > 10), []}. + * It loads all documents where {@code a} is single valued and + * greater than 10. + *
  • + *
  • + * For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)} + * we'll use a two element list containing + *
      + *
    • {@code +single_value(a) +(a < 100), [0]}
    • + *
    • {@code +single_value(a) +(a >= 100), [100]}
    • + *
    + * It loads all documents in the index where {@code a} is single + * valued and adds a constant {@code 0} to the documents where + * {@code a < 100} and the constant {@code 100} to the documents + * where {@code a >= 100}. + *
  • + *
+ *

+ * IMPORTANT: Runners make no effort to deduplicate the results from multiple + * queries. If you need to only see each document one time then make sure the + * queries are mutually exclusive. + *

*/ public final class LuceneSliceQueue { + /** + * Query to run and tags to add to the results. + */ + public record QueryAndTags(Query query, List tags) {} + public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher @@ -69,7 +108,7 @@ public Collection remainingShardsIdentifiers() { public static LuceneSliceQueue create( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, Function autoStrategy, int taskConcurrency, @@ -78,27 +117,29 @@ public static LuceneSliceQueue create( List slices = new ArrayList<>(); Map partitioningStrategies = new HashMap<>(contexts.size()); for (ShardContext ctx : contexts) { - Query query = queryFunction.apply(ctx); - query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); - /* - * Rewrite the query on the local index so things like fully - * overlapping range queries become match all. It's important - * to do this before picking the partitioning strategy so we - * can pick more aggressive strategies when the query rewrites - * into MatchAll. - */ - try { - query = ctx.searcher().rewrite(query); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query); - partitioningStrategies.put(ctx.shardIdentifier(), partitioning); - List> groups = partitioning.groups(ctx.searcher(), taskConcurrency); - Weight weight = weight(ctx, query, scoreMode); - for (List group : groups) { - if (group.isEmpty() == false) { - slices.add(new LuceneSlice(ctx, group, weight)); + for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) { + Query query = queryAndExtra.query; + query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); + /* + * Rewrite the query on the local index so things like fully + * overlapping range queries become match all. It's important + * to do this before picking the partitioning strategy so we + * can pick more aggressive strategies when the query rewrites + * into MatchAll. + */ + try { + query = ctx.searcher().rewrite(query); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query); + partitioningStrategies.put(ctx.shardIdentifier(), partitioning); + List> groups = partitioning.groups(ctx.searcher(), taskConcurrency); + Weight weight = weight(ctx, query, scoreMode); + for (List group : groups) { + if (group.isEmpty() == false) { + slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags)); + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 51b842d3f0ddc..26339d2bdb108 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -17,8 +17,9 @@ import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.DocBlock; +import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.IntVector; @@ -64,7 +65,7 @@ public static class Factory extends LuceneOperator.Factory { public Factory( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, int taskConcurrency, int maxPageSize, @@ -320,28 +321,29 @@ public Page getCheckedOutput() throws IOException { IntVector shard = null; IntVector leaf = null; IntVector docs = null; - DoubleVector scores = null; - DocBlock docBlock = null; + Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()]; currentPagePos -= discardedDocs; try { shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos); leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos); docs = buildDocsVector(currentPagePos); docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize)); - docBlock = new DocVector(shard, leaf, docs, true).asBlock(); + int b = 0; + blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock(); shard = null; leaf = null; docs = null; - if (scoreBuilder == null) { - page = new Page(currentPagePos, docBlock); - } else { - scores = buildScoresVector(currentPagePos); + if (scoreBuilder != null) { + blocks[b++] = buildScoresVector(currentPagePos).asBlock(); scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize)); - page = new Page(currentPagePos, docBlock, scores.asBlock()); } + for (Object e : scorer.tags()) { + blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos); + } + page = new Page(currentPagePos, blocks); } finally { if (page == null) { - Releasables.closeExpectNoException(shard, leaf, docs, docBlock, scores); + Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks)); } } currentPagePos = 0; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index d2c26655ea321..5457caa25e158 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -58,7 +58,7 @@ public static class Factory extends LuceneOperator.Factory { public Factory( List contexts, - Function queryFunction, + Function> queryFunction, DataPartitioning dataPartitioning, int taskConcurrency, int maxPageSize, @@ -171,6 +171,9 @@ private Page collect() throws IOException { return emit(true); } try { + if (scorer.tags().isEmpty() == false) { + throw new UnsupportedOperationException("tags not supported by " + getClass()); + } if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) { // TODO: share the bottom between shardCollectors perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java index acae71f5668f0..a0dfadcdbd9db 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java @@ -116,6 +116,9 @@ public Page getCheckedOutput() throws IOException { doneCollecting = true; return null; } + if (slice.tags().isEmpty() == false) { + throw new UnsupportedOperationException("tags not supported by " + getClass()); + } Releasables.close(fieldsReader); fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts); iterator = new SegmentsIterator(slice); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java index a3592a54c56c4..4a581198a235a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java @@ -7,7 +7,6 @@ package org.elasticsearch.compute.lucene; -import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.SourceOperator; @@ -37,7 +36,7 @@ private TimeSeriesSourceOperatorFactory( List contexts, boolean emitDocIds, List fieldsToExact, - Function queryFunction, + Function> queryFunction, int taskConcurrency, int maxPageSize, int limit @@ -74,7 +73,7 @@ public static TimeSeriesSourceOperatorFactory create( boolean emitDocIds, List contexts, List fieldsToExact, - Function queryFunction + Function> queryFunction ) { return new TimeSeriesSourceOperatorFactory(contexts, emitDocIds, fieldsToExact, queryFunction, taskConcurrency, maxPageSize, limit); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 8060d42458056..1a88bac8b3953 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -9,6 +9,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -43,9 +44,11 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; import org.elasticsearch.compute.lucene.ShardContext; @@ -54,7 +57,6 @@ import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.HashAggregationOperator; -import org.elasticsearch.compute.operator.LimitOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OrdinalsGroupingOperator; import org.elasticsearch.compute.operator.PageConsumerOperator; @@ -64,9 +66,11 @@ import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; import org.elasticsearch.compute.test.TestDriverFactory; +import org.elasticsearch.compute.test.TestResultPageSinkOperator; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.BlockDocValuesReader; import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -75,7 +79,6 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.lookup.SearchLookup; -import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.ArrayList; @@ -90,9 +93,9 @@ import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL; import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL; import static org.elasticsearch.compute.test.OperatorTestCase.randomPageSize; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * This venerable test builds {@link Driver}s by hand and runs them together, simulating @@ -112,7 +115,11 @@ public void testQueryOperator() throws IOException { final long from = randomBoolean() ? Long.MIN_VALUE : randomLongBetween(0, 10000); final long to = randomBoolean() ? Long.MAX_VALUE : randomLongBetween(from, from + 10000); final Query query = LongPoint.newRangeQuery("pt", from, to); - LuceneOperator.Factory factory = luceneOperatorFactory(reader, query, LuceneOperator.NO_LIMIT); + LuceneOperator.Factory factory = luceneOperatorFactory( + reader, + List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())), + LuceneOperator.NO_LIMIT + ); List drivers = new ArrayList<>(); try { Set actualDocIds = ConcurrentCollections.newConcurrentSet(); @@ -221,7 +228,11 @@ public String toString() { ); Driver driver = TestDriverFactory.create( driverContext, - luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext), + luceneOperatorFactory( + reader, + List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), + LuceneOperator.NO_LIMIT + ).get(driverContext), operators, new PageConsumerOperator(page -> { BytesRefBlock keys = page.getBlock(0); @@ -243,31 +254,109 @@ public String toString() { assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); } - public void testLimitOperator() { - var positions = 100; - var limit = randomIntBetween(90, 101); - var values = randomList(positions, positions, ESTestCase::randomLong); + public void testPushRoundToToQuery() throws IOException { + long firstGroupMax = randomLong(); + long secondGroupMax = randomLong(); + long thirdGroupMax = randomLong(); - var results = new ArrayList(); - DriverContext driverContext = driverContext(); - try ( - var driver = TestDriverFactory.create( - driverContext, - new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100), - List.of((new LimitOperator.Factory(limit)).get(driverContext)), - new PageConsumerOperator(page -> { - LongBlock block = page.getBlock(0); - for (int i = 0; i < page.getPositionCount(); i++) { - results.add(block.getLong(i)); + CheckedConsumer verifier = reader -> { + Query firstGroupQuery = LongPoint.newRangeQuery("g", Long.MIN_VALUE, 99); + Query secondGroupQuery = LongPoint.newRangeQuery("g", 100, 9999); + Query thirdGroupQuery = LongPoint.newRangeQuery("g", 10000, Long.MAX_VALUE); + + LuceneSliceQueue.QueryAndTags firstGroupQueryAndTags = new LuceneSliceQueue.QueryAndTags(firstGroupQuery, List.of(0L)); + LuceneSliceQueue.QueryAndTags secondGroupQueryAndTags = new LuceneSliceQueue.QueryAndTags(secondGroupQuery, List.of(100L)); + LuceneSliceQueue.QueryAndTags thirdGroupQueryAndTags = new LuceneSliceQueue.QueryAndTags(thirdGroupQuery, List.of(10000L)); + + LuceneOperator.Factory factory = luceneOperatorFactory( + reader, + List.of(firstGroupQueryAndTags, secondGroupQueryAndTags, thirdGroupQueryAndTags), + LuceneOperator.NO_LIMIT + ); + ValuesSourceReaderOperator.Factory load = new ValuesSourceReaderOperator.Factory( + List.of( + new ValuesSourceReaderOperator.FieldInfo("v", ElementType.LONG, f -> new BlockDocValuesReader.LongsBlockLoader("v")) + ), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { + throw new UnsupportedOperationException(); + }, 0.8)), + 0 + ); + List pages = new ArrayList<>(); + DriverContext driverContext = driverContext(); + try ( + Driver driver = TestDriverFactory.create( + driverContext, + factory.get(driverContext), + List.of(load.get(driverContext)), + new TestResultPageSinkOperator(pages::add) + ) + ) { + OperatorTestCase.runDriver(driver); + } + assertDriverContext(driverContext); + + boolean sawFirstMax = false; + boolean sawSecondMax = false; + boolean sawThirdMax = false; + for (Page page : pages) { + logger.error("ADFA {}", page); + LongVector group = page.getBlock(1).asVector(); + LongVector value = page.getBlock(2).asVector(); + for (int p = 0; p < page.getPositionCount(); p++) { + long g = group.getLong(p); + long v = value.getLong(p); + switch ((int) g) { + case 0 -> { + assertThat(v, lessThanOrEqualTo(firstGroupMax)); + sawFirstMax |= v == firstGroupMax; + } + case 100 -> { + assertThat(v, lessThanOrEqualTo(secondGroupMax)); + sawSecondMax |= v == secondGroupMax; + } + case 10000 -> { + assertThat(v, lessThanOrEqualTo(thirdGroupMax)); + sawThirdMax |= v == thirdGroupMax; + } + default -> throw new IllegalArgumentException("Unknown group [" + g + "]"); } - }) - ) - ) { - OperatorTestCase.runDriver(driver); - } + } + } + assertTrue(sawFirstMax); + assertTrue(sawSecondMax); + assertTrue(sawThirdMax); + }; - assertThat(results, contains(values.stream().limit(limit).toArray())); - assertDriverContext(driverContext); + try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + int numDocs = randomIntBetween(0, 10_000); + for (int i = 0; i < numDocs; i++) { + long g, v; + switch (between(0, 2)) { + case 0 -> { + g = randomLongBetween(Long.MIN_VALUE, 99); + v = randomLongBetween(Long.MIN_VALUE, firstGroupMax); + } + case 1 -> { + g = randomLongBetween(100, 9999); + v = randomLongBetween(Long.MIN_VALUE, secondGroupMax); + } + case 2 -> { + g = randomLongBetween(10000, Long.MAX_VALUE); + v = randomLongBetween(Long.MIN_VALUE, thirdGroupMax); + } + default -> throw new IllegalArgumentException(); + } + w.addDocument(List.of(new LongField("g", g, Field.Store.NO), new LongField("v", v, Field.Store.NO))); + } + w.addDocument(List.of(new LongField("g", 0, Field.Store.NO), new LongField("v", firstGroupMax, Field.Store.NO))); + w.addDocument(List.of(new LongField("g", 200, Field.Store.NO), new LongField("v", secondGroupMax, Field.Store.NO))); + w.addDocument(List.of(new LongField("g", 20000, Field.Store.NO), new LongField("v", thirdGroupMax, Field.Store.NO))); + + try (DirectoryReader reader = w.getReader()) { + verifier.accept(reader); + } + } } private static Set searchForDocIds(IndexReader reader, Query query) throws IOException { @@ -388,11 +477,11 @@ public static void assertDriverContext(DriverContext driverContext) { assertThat(driverContext.getSnapshot().releasables(), empty()); } - static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, Query query, int limit) { + static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List queryAndTags, int limit) { final ShardContext searchContext = new LuceneSourceOperatorTests.MockShardContext(reader, 0); return new LuceneSourceOperator.Factory( List.of(searchContext), - ctx -> query, + ctx -> queryAndTags, randomFrom(DataPartitioning.values()), randomIntBetween(1, 10), randomPageSize(), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java index bf0575821fbe6..2487016004cf2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java @@ -90,7 +90,13 @@ private LuceneCountOperator.Factory simple(DataPartitioning dataPartitioning, in } else { query = LongPoint.newRangeQuery("s", 0, numDocs); } - return new LuceneCountOperator.Factory(List.of(ctx), c -> query, dataPartitioning, between(1, 8), limit); + return new LuceneCountOperator.Factory( + List.of(ctx), + c -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())), + dataPartitioning, + between(1, 8), + limit + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java index 6275b3a07cacb..ea717ae805118 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java @@ -116,7 +116,15 @@ private LuceneMaxFactory simple(NumberTypeTest numberTypeTest, DataPartitioning } else { query = SortedNumericDocValuesField.newSlowRangeQuery(FIELD_NAME, Long.MIN_VALUE, Long.MAX_VALUE); } - return new LuceneMaxFactory(List.of(ctx), c -> query, dataPartitioning, between(1, 8), FIELD_NAME, getNumberType(), limit); + return new LuceneMaxFactory( + List.of(ctx), + c -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())), + dataPartitioning, + between(1, 8), + FIELD_NAME, + getNumberType(), + limit + ); } public void testSimple() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java index cf1620b309010..ac0d16103ed92 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java @@ -116,7 +116,15 @@ private LuceneMinFactory simple(NumberTypeTest numberTypeTest, DataPartitioning } else { query = SortedNumericDocValuesField.newSlowRangeQuery(FIELD_NAME, Long.MIN_VALUE, Long.MAX_VALUE); } - return new LuceneMinFactory(List.of(ctx), c -> query, dataPartitioning, between(1, 8), FIELD_NAME, getNumberType(), limit); + return new LuceneMinFactory( + List.of(ctx), + c -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())), + dataPartitioning, + between(1, 8), + FIELD_NAME, + getNumberType(), + limit + ); } public void testSimple() { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 18db35be608d7..eb7cb32fd0e74 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -273,7 +273,7 @@ private static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, final ShardContext searchContext = new LuceneSourceOperatorTests.MockShardContext(reader, 0); return new LuceneSourceOperator.Factory( List.of(searchContext), - ctx -> query, + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())), randomFrom(DataPartitioning.values()), randomIntBetween(1, 10), randomPageSize(), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 5e0fb3fa87b29..4c5c860d244a0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -7,19 +7,21 @@ package org.elasticsearch.compute.lucene; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; @@ -63,9 +65,117 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase { private static final MappedFieldType S_FIELD = new NumberFieldMapper.NumberFieldType("s", NumberFieldMapper.NumberType.LONG); + + @ParametersFactory(argumentFormatting = "%s %s") + public static Iterable parameters() { + List parameters = new ArrayList<>(); + for (TestCase c : TestCase.values()) { + for (boolean scoring : new boolean[] { false, true }) { + parameters.add(new Object[] { c, scoring }); + } + } + return parameters; + } + + public enum TestCase { + MATCH_ALL { + @Override + List queryAndExtra() { + return List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())); + } + + @Override + void checkPages(int numDocs, int limit, int maxPageSize, List results) { + int maxPages = Math.min(numDocs, limit); + int minPages = (int) Math.ceil((double) maxPages / maxPageSize); + assertThat(results, hasSize(both(greaterThanOrEqualTo(minPages)).and(lessThanOrEqualTo(maxPages)))); + } + + @Override + int numResults(int numDocs) { + return numDocs; + } + }, + MATCH_0_AND_1 { + @Override + List queryAndExtra() { + return List.of( + new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowExactQuery("s", 0), List.of(123)), + new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowExactQuery("s", 1), List.of(456)) + ); + } + + @Override + void checkPages(int numDocs, int limit, int maxPageSize, List results) { + assertThat(results, hasSize(Math.min(numDocs, 2))); + if (results.isEmpty() == false) { + Page page = results.get(0); + IntBlock extra = page.getBlock(page.getBlockCount() - 2); + assertThat(extra.asVector().isConstant(), equalTo(true)); + assertThat(extra.getInt(0), equalTo(123)); + if (results.size() > 1) { + page = results.get(1); + extra = page.getBlock(page.getBlockCount() - 2); + assertThat(extra.asVector().isConstant(), equalTo(true)); + assertThat(extra.getInt(0), equalTo(456)); + } + } + } + + @Override + int numResults(int numDocs) { + return Math.min(numDocs, 2); + } + }, + LTE_100_GT_100 { + @Override + List queryAndExtra() { + return List.of( + new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowRangeQuery("s", 0, 100), List.of(123)), + new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowRangeQuery("s", 101, Long.MAX_VALUE), List.of(456)) + ); + } + + @Override + void checkPages(int numDocs, int limit, int maxPageSize, List results) { + MATCH_ALL.checkPages(numDocs, limit, maxPageSize, results); + for (Page page : results) { + IntBlock extra = page.getBlock(page.getBlockCount() - 2); + LongBlock data = page.getBlock(page.getBlockCount() - 1); + for (int p = 0; p < page.getPositionCount(); p++) { + assertThat(extra.getInt(p), equalTo(data.getLong(p) <= 100 ? 123 : 456)); + } + } + } + + @Override + int numResults(int numDocs) { + return numDocs; + } + }; + + abstract List queryAndExtra(); + + abstract void checkPages(int numDocs, int limit, int maxPageSize, List results); + + abstract int numResults(int numDocs); + } + + private final TestCase testCase; + /** + * Do we enable scoring? We don't check the score in this test, but + * it's nice to make sure everything else works with scoring enabled. + */ + private final boolean scoring; + private Directory directory = newDirectory(); private IndexReader reader; + public LuceneSourceOperatorTests(TestCase testCase, boolean scoring) { + this.testCase = testCase; + this.scoring = scoring; + } + @After public void closeIndex() throws IOException { IOUtils.close(reader, directory); @@ -94,12 +204,19 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i } } reader = writer.getReader(); + + IndexSearcher searcher = new IndexSearcher(reader); + int count = 0; + for (LuceneSliceQueue.QueryAndTags q : testCase.queryAndExtra()) { + count += searcher.count(q.query()); + } + assertThat(count, equalTo(testCase.numResults(numDocs))); } catch (IOException e) { throw new RuntimeException(e); } ShardContext ctx = new MockShardContext(reader, 0); - Function queryFunction = c -> new MatchAllDocsQuery(); + Function> queryFunction = c -> testCase.queryAndExtra(); int maxPageSize = between(10, Math.max(10, numDocs)); int taskConcurrency = randomIntBetween(1, 4); return new LuceneSourceOperator.Factory( @@ -152,15 +269,17 @@ private void testSimple(DataPartitioning partitioning) { } public void testEarlyTermination() { - int size = between(1_000, 20_000); - int limit = between(0, Integer.MAX_VALUE); - LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), size, limit, scoring); + int numDocs = between(1_000, 20_000); + int limit = between(0, numDocs * 2); + LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), numDocs, limit, scoring); int taskConcurrency = factory.taskConcurrency(); final AtomicInteger receivedRows = new AtomicInteger(); + List sources = new ArrayList<>(); List drivers = new ArrayList<>(); for (int i = 0; i < taskConcurrency; i++) { DriverContext driverContext = driverContext(); SourceOperator sourceOperator = factory.get(driverContext); + sources.add(sourceOperator); SinkOperator sinkOperator = new PageConsumerOperator(p -> { receivedRows.addAndGet(p.getPositionCount()); p.releaseBlocks(); @@ -183,7 +302,17 @@ public void testEarlyTermination() { drivers.add(driver); } OperatorTestCase.runDriver(drivers); - assertThat(receivedRows.get(), equalTo(Math.min(limit, size))); + for (SourceOperator source : sources) { + logger.info("source status {}", source.status()); + } + logger.info( + "{} received={} limit={} numResults={}", + factory.dataPartitioning, + receivedRows.get(), + limit, + testCase.numResults(numDocs) + ); + assertThat(receivedRows.get(), equalTo(Math.min(limit, testCase.numResults(numDocs)))); } public void testEmpty() { @@ -228,8 +357,8 @@ private void testWithCranky(DataPartitioning partitioning) { } } - private void testSimple(DriverContext ctx, DataPartitioning partitioning, int size, int limit) { - LuceneSourceOperator.Factory factory = simple(partitioning, size, limit, scoring); + private void testSimple(DriverContext ctx, DataPartitioning partitioning, int numDocs, int limit) { + LuceneSourceOperator.Factory factory = simple(partitioning, numDocs, limit, scoring); Operator.OperatorFactory readS = ValuesSourceReaderOperatorTests.factory(reader, S_FIELD, ElementType.LONG); List results = new ArrayList<>(); @@ -246,26 +375,26 @@ private void testSimple(DriverContext ctx, DataPartitioning partitioning, int si for (Page page : results) { LongBlock sBlock = page.getBlock(initialBlockIndex(page)); for (int p = 0; p < page.getPositionCount(); p++) { - assertThat(sBlock.getLong(sBlock.getFirstValueIndex(p)), both(greaterThanOrEqualTo(0L)).and(lessThan((long) size))); + assertThat(sBlock.getLong(sBlock.getFirstValueIndex(p)), both(greaterThanOrEqualTo(0L)).and(lessThan((long) numDocs))); } } - int maxPages = Math.min(size, limit); - int minPages = (int) Math.ceil(maxPages / factory.maxPageSize()); - assertThat(results, hasSize(both(greaterThanOrEqualTo(minPages)).and(lessThanOrEqualTo(maxPages)))); - } - // Scores are not interesting to this test, but enabled conditionally and effectively ignored just for coverage. - private final boolean scoring = randomBoolean(); + testCase.checkPages(numDocs, limit, factory.maxPageSize(), results); + int count = results.stream().mapToInt(Page::getPositionCount).sum(); + logger.info("{} received={} limit={} numResults={}", factory.dataPartitioning, count, limit, testCase.numResults(numDocs)); + assertThat(count, equalTo(Math.min(limit, testCase.numResults(numDocs)))); + } // Returns the initial block index, ignoring the score block if scoring is enabled private int initialBlockIndex(Page page) { assert page.getBlock(0) instanceof DocBlock : "expected doc block at index 0"; + int offset = 1; if (scoring) { assert page.getBlock(1) instanceof DoubleBlock : "expected double block at index 1"; - return 2; - } else { - return 1; + offset++; } + offset += testCase.queryAndExtra().get(0).tags().size(); + return offset; } /** diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 9a558f69eb6ef..db60d3cd19cb3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -12,7 +12,6 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.Query; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSelector; @@ -91,7 +90,9 @@ public Optional buildSort(List> sorts) { return Optional.of(new SortAndFormats(new Sort(field), new DocValueFormat[] { null })); } }; - Function queryFunction = c -> new MatchAllDocsQuery(); + Function> queryFunction = c -> List.of( + new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of()) + ); int taskConcurrency = 0; int maxPageSize = between(10, Math.max(10, size)); List> sorts = List.of(new FieldSortBuilder("s")); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index a43c37979a94e..0c9bf676e0547 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -12,7 +12,6 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.Query; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSelector; @@ -96,7 +95,9 @@ public Optional buildSort(List> sorts) { return Optional.of(new SortAndFormats(new Sort(field), new DocValueFormat[] { null })); } }; - Function queryFunction = c -> new MatchAllDocsQuery(); + Function> queryFunction = c -> List.of( + new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of()) + ); int taskConcurrency = 0; int maxPageSize = between(10, Math.max(10, size)); List> sorts = List.of(new FieldSortBuilder("s")); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java index d6b9c30614c9f..b66f32adf48dd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java @@ -310,7 +310,7 @@ public void testMatchNone() throws Exception { randomBoolean(), List.of(ctx), List.of(), - unused -> query + unused -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())) ); var driverContext = driverContext(); List results = new ArrayList<>(); @@ -449,7 +449,9 @@ public MappedFieldType fieldType(String name) { throw new IllegalArgumentException("Unknown field [" + name + "]"); } }; - Function queryFunction = c -> new MatchAllDocsQuery(); + Function> queryFunction = c -> List.of( + new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of()) + ); var fieldInfos = extractFields.stream() .map( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java index 55c7b95d1edf0..88211a9170034 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java @@ -262,7 +262,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv } var luceneFactory = new LuceneSourceOperator.Factory( shardContexts, - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, 1,// randomIntBetween(1, 10), pageSize, @@ -1290,7 +1290,7 @@ public void testWithNulls() throws IOException { DriverContext driverContext = driverContext(); var luceneFactory = new LuceneSourceOperator.Factory( List.of(shardContext), - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), randomFrom(DataPartitioning.values()), randomIntBetween(1, 10), randomPageSize(), @@ -1449,7 +1449,7 @@ public void testManyShards() throws IOException { } var luceneFactory = new LuceneSourceOperator.Factory( contexts, - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, randomIntBetween(1, 10), 1000, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index a0fae75fd6200..1550b6dc013ab 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -178,7 +178,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv } var luceneFactory = new LuceneSourceOperator.Factory( List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)), - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, randomIntBetween(1, 10), pageSize, @@ -1334,7 +1334,7 @@ public void testWithNulls() throws IOException { DriverContext driverContext = driverContext(); var luceneFactory = new LuceneSourceOperator.Factory( List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)), - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), randomFrom(DataPartitioning.values()), randomIntBetween(1, 10), randomPageSize(), @@ -1580,7 +1580,7 @@ public void testManyShards() throws IOException { } var luceneFactory = new LuceneSourceOperator.Factory( contexts, - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, randomIntBetween(1, 10), 1000, diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index c5cbeaac71072..b3fcf74234aa2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; @@ -187,7 +188,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws ); LuceneSourceOperator.Factory source = new LuceneSourceOperator.Factory( List.of(esqlContext), - ctx -> new MatchAllDocsQuery(), + ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SEGMENT, 1, 10000, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index ac91890bff914..1c699c60c155d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -21,6 +21,7 @@ import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.lucene.TimeSeriesSourceOperatorFactory; @@ -192,9 +193,11 @@ private static class DefaultShardContextForUnmappedField extends DefaultShardCon return null; } - public Function querySupplier(QueryBuilder builder) { + public Function> querySupplier( + QueryBuilder builder + ) { QueryBuilder qb = builder == null ? QueryBuilders.matchAllQuery().boost(0.0f) : builder; - return ctx -> shardContexts.get(ctx.index()).toQuery(qb); + return ctx -> List.of(new LuceneSliceQueue.QueryAndTags(shardContexts.get(ctx.index()).toQuery(qb), List.of())); } @Override