diff --git a/docs/changelog/125739.yaml b/docs/changelog/125739.yaml new file mode 100644 index 0000000000000..cc5fa57b0f09b --- /dev/null +++ b/docs/changelog/125739.yaml @@ -0,0 +1,5 @@ +pr: 125739 +summary: Heuristics to pick efficient partitioning +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index c730a0fe4cf07..5e0a25dcccba7 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -217,6 +217,7 @@ static TransportVersion def(int id) { public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0); public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0); public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_00_0); + public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java index 926b9e08d2e08..2529b060a9998 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java @@ -7,11 +7,39 @@ package org.elasticsearch.compute.lucene; -public enum DataPartitioning { +import org.elasticsearch.compute.operator.Driver; +/** + * How we partition the data across {@link Driver}s. Each request forks into + * {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions + * allow us to bring more threads to bear on CPU intensive data node side tasks. + */ +public enum DataPartitioning { + /** + * Automatically select the data partitioning based on the query and index. + * Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}. + * When the additional overhead from {@link #DOC} is fairly low then it'll + * pick {@link #DOC}. + */ + AUTO, + /** + * Make one partition per shard. This is generally the slowest option, but it + * has the lowest CPU overhead. + */ SHARD, - + /** + * Partition on segment boundaries, this doesn't allow forking to as many CPUs + * as {@link #DOC} but it has much lower overhead. + *

+ * It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE} + * docs together into a partition. Larger segments get their own partition. + * Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}. + */ SEGMENT, - + /** + * Partition each shard into {@code task_concurrency} partitions, splitting + * larger segments into slices. This allows bringing the most CPUs to bear on + * the problem but adds extra overhead, especially in query preparation. + */ DOC, } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java index 327303c45ad4b..5fadf98a9d823 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java @@ -49,7 +49,16 @@ public Factory( int taskConcurrency, int limit ) { - super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + false, + ScoreMode.COMPLETE_NO_SCORES + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java index 3343750562cf5..5e91f2b80bcec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}. */ @@ -123,7 +121,16 @@ public LuceneMaxFactory( NumberType numberType, int limit ) { - super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + false, + ScoreMode.COMPLETE_NO_SCORES + ); this.fieldName = fieldName; this.numberType = numberType; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java index 5f0849e882813..fc457ae196186 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}. */ @@ -123,7 +121,16 @@ public LuceneMinFactory( NumberType numberType, int limit ) { - super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + false, + ScoreMode.COMPLETE_NO_SCORES + ); this.fieldName = fieldName; this.numberType = numberType; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 2279603432d2f..9bd5af16b094f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -9,7 +9,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.BulkScorer; -import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Query; @@ -37,12 +36,16 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING; + public abstract class LuceneOperator extends SourceOperator { private static final Logger logger = LogManager.getLogger(LuceneOperator.class); @@ -93,15 +96,17 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac */ protected Factory( List contexts, - Function weightFunction, + Function queryFunction, DataPartitioning dataPartitioning, + Function autoStrategy, int taskConcurrency, int limit, - boolean needsScore + boolean needsScore, + ScoreMode scoreMode ) { this.limit = limit; this.dataPartitioning = dataPartitioning; - this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency); + this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode); this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency); this.needsScore = needsScore; } @@ -269,6 +274,7 @@ public static class Status implements Operator.Status { private final int sliceMax; private final int current; private final long rowsEmitted; + private final Map partitioningStrategies; private Status(LuceneOperator operator) { processedSlices = operator.processedSlices; @@ -294,6 +300,7 @@ private Status(LuceneOperator operator) { } pagesEmitted = operator.pagesEmitted; rowsEmitted = operator.rowsEmitted; + partitioningStrategies = operator.sliceQueue.partitioningStrategies(); } Status( @@ -307,7 +314,8 @@ private Status(LuceneOperator operator) { int sliceMin, int sliceMax, int current, - long rowsEmitted + long rowsEmitted, + Map partitioningStrategies ) { this.processedSlices = processedSlices; this.processedQueries = processedQueries; @@ -320,6 +328,7 @@ private Status(LuceneOperator operator) { this.sliceMax = sliceMax; this.current = current; this.rowsEmitted = rowsEmitted; + this.partitioningStrategies = partitioningStrategies; } Status(StreamInput in) throws IOException { @@ -343,6 +352,9 @@ private Status(LuceneOperator operator) { } else { rowsEmitted = 0; } + partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING) + ? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom) + : Map.of(); } @Override @@ -364,6 +376,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { out.writeVLong(rowsEmitted); } + if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) { + out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable); + } } @Override @@ -415,6 +430,10 @@ public long rowsEmitted() { return rowsEmitted; } + public Map partitioningStrategies() { + return partitioningStrategies; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -432,6 +451,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("slice_max", sliceMax); builder.field("current", current); builder.field("rows_emitted", rowsEmitted); + builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies)); return builder.endObject(); } @@ -450,12 +470,23 @@ public boolean equals(Object o) { && sliceMin == status.sliceMin && sliceMax == status.sliceMax && current == status.current - && rowsEmitted == status.rowsEmitted; + && rowsEmitted == status.rowsEmitted + && partitioningStrategies.equals(status.partitioningStrategies); } @Override public int hashCode() { - return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted); + return Objects.hash( + processedSlices, + sliceIndex, + totalSlices, + pagesEmitted, + sliceMin, + sliceMax, + current, + rowsEmitted, + partitioningStrategies + ); } @Override @@ -468,17 +499,4 @@ public TransportVersion getMinimalSupportedVersion() { return TransportVersions.V_8_11_X; } } - - static Function weightFunction(Function queryFunction, ScoreMode scoreMode) { - return ctx -> { - final var query = queryFunction.apply(ctx); - final var searcher = ctx.searcher(); - try { - Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); - return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }; - } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 0407e0f726044..4a8847e2870aa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -7,17 +7,25 @@ package org.elasticsearch.compute.lucene; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; @@ -26,15 +34,17 @@ * Shared Lucene slices between Lucene operators. */ public final class LuceneSliceQueue { - private static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher - private static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher + public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher + public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher private final int totalSlices; private final Queue slices; + private final Map partitioningStrategies; - private LuceneSliceQueue(List slices) { + private LuceneSliceQueue(List slices, Map partitioningStrategies) { this.totalSlices = slices.size(); this.slices = new ConcurrentLinkedQueue<>(slices); + this.partitioningStrategies = partitioningStrategies; } @Nullable @@ -46,82 +56,196 @@ public int totalSlices() { return totalSlices; } + /** + * Strategy used to partition each shard in this queue. + */ + public Map partitioningStrategies() { + return partitioningStrategies; + } + public Collection remainingShardsIdentifiers() { return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList(); } public static LuceneSliceQueue create( List contexts, - Function weightFunction, + Function queryFunction, DataPartitioning dataPartitioning, - int taskConcurrency + Function autoStrategy, + int taskConcurrency, + ScoreMode scoreMode ) { - final List slices = new ArrayList<>(); + List slices = new ArrayList<>(); + Map partitioningStrategies = new HashMap<>(contexts.size()); for (ShardContext ctx : contexts) { - final List leafContexts = ctx.searcher().getLeafContexts(); - List> groups = switch (dataPartitioning) { - case SHARD -> Collections.singletonList(leafContexts.stream().map(PartialLeafReaderContext::new).toList()); - case SEGMENT -> segmentSlices(leafContexts); - case DOC -> docSlices(ctx.searcher().getIndexReader(), taskConcurrency); - }; - final Weight weight = weightFunction.apply(ctx); + Query query = queryFunction.apply(ctx); + query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); + /* + * Rewrite the query on the local index so things like fully + * overlapping range queries become match all. It's important + * to do this before picking the partitioning strategy so we + * can pick more aggressive strategies when the query rewrites + * into MatchAll. + */ + try { + query = ctx.searcher().rewrite(query); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query); + partitioningStrategies.put(ctx.shardIdentifier(), partitioning); + List> groups = partitioning.groups(ctx.searcher(), taskConcurrency); + Weight weight = weight(ctx, query, scoreMode); for (List group : groups) { if (group.isEmpty() == false) { slices.add(new LuceneSlice(ctx, group, weight)); } } } - return new LuceneSliceQueue(slices); + return new LuceneSliceQueue(slices, partitioningStrategies); } - static List> docSlices(IndexReader indexReader, int numSlices) { - final int totalDocCount = indexReader.maxDoc(); - final int normalMaxDocsPerSlice = totalDocCount / numSlices; - final int extraDocsInFirstSlice = totalDocCount % numSlices; - final List> slices = new ArrayList<>(); - int docsAllocatedInCurrentSlice = 0; - List currentSlice = null; - int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice; - for (LeafReaderContext ctx : indexReader.leaves()) { - final int numDocsInLeaf = ctx.reader().maxDoc(); - int minDoc = 0; - while (minDoc < numDocsInLeaf) { - int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc); - if (numDocsToUse <= 0) { - break; - } - if (currentSlice == null) { - currentSlice = new ArrayList<>(); + /** + * Strategy used to partition each shard into slices. See {@link DataPartitioning} + * for descriptions on how each value works. + */ + public enum PartitioningStrategy implements Writeable { + /** + * See {@link DataPartitioning#SHARD}. + */ + SHARD(0) { + @Override + List> groups(IndexSearcher searcher, int requestedNumSlices) { + return List.of(searcher.getLeafContexts().stream().map(PartialLeafReaderContext::new).toList()); + } + }, + /** + * See {@link DataPartitioning#SEGMENT}. + */ + SEGMENT(1) { + @Override + List> groups(IndexSearcher searcher, int requestedNumSlices) { + IndexSearcher.LeafSlice[] gs = IndexSearcher.slices( + searcher.getLeafContexts(), + MAX_DOCS_PER_SLICE, + MAX_SEGMENTS_PER_SLICE, + false + ); + return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList(); + } + }, + /** + * See {@link DataPartitioning#DOC}. + */ + DOC(2) { + @Override + List> groups(IndexSearcher searcher, int requestedNumSlices) { + final int totalDocCount = searcher.getIndexReader().maxDoc(); + final int normalMaxDocsPerSlice = totalDocCount / requestedNumSlices; + final int extraDocsInFirstSlice = totalDocCount % requestedNumSlices; + final List> slices = new ArrayList<>(); + int docsAllocatedInCurrentSlice = 0; + List currentSlice = null; + int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice; + for (LeafReaderContext ctx : searcher.getLeafContexts()) { + final int numDocsInLeaf = ctx.reader().maxDoc(); + int minDoc = 0; + while (minDoc < numDocsInLeaf) { + int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc); + if (numDocsToUse <= 0) { + break; + } + if (currentSlice == null) { + currentSlice = new ArrayList<>(); + } + currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse)); + minDoc += numDocsToUse; + docsAllocatedInCurrentSlice += numDocsToUse; + if (docsAllocatedInCurrentSlice == maxDocsPerSlice) { + slices.add(currentSlice); + // once the first slice with the extra docs is added, no need for extra docs + maxDocsPerSlice = normalMaxDocsPerSlice; + currentSlice = null; + docsAllocatedInCurrentSlice = 0; + } + } } - currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse)); - minDoc += numDocsToUse; - docsAllocatedInCurrentSlice += numDocsToUse; - if (docsAllocatedInCurrentSlice == maxDocsPerSlice) { + if (currentSlice != null) { slices.add(currentSlice); - maxDocsPerSlice = normalMaxDocsPerSlice; // once the first slice with the extra docs is added, no need for extra docs - currentSlice = null; - docsAllocatedInCurrentSlice = 0; } + if (requestedNumSlices < totalDocCount && slices.size() != requestedNumSlices) { + throw new IllegalStateException("wrong number of slices, expected " + requestedNumSlices + " but got " + slices.size()); + } + if (slices.stream() + .flatMapToInt( + l -> l.stream() + .mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc()) + ) + .sum() != totalDocCount) { + throw new IllegalStateException("wrong doc count"); + } + return slices; } + }; + + private final byte id; + + PartitioningStrategy(int id) { + this.id = (byte) id; } - if (currentSlice != null) { - slices.add(currentSlice); + + public static PartitioningStrategy readFrom(StreamInput in) throws IOException { + int id = in.readByte(); + return switch (id) { + case 0 -> SHARD; + case 1 -> SEGMENT; + case 2 -> DOC; + default -> throw new IllegalArgumentException("invalid PartitioningStrategyId [" + id + "]"); + }; } - if (numSlices < totalDocCount && slices.size() != numSlices) { - throw new IllegalStateException("wrong number of slices, expected " + numSlices + " but got " + slices.size()); + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(id); } - if (slices.stream() - .flatMapToInt( - l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc()) - ) - .sum() != totalDocCount) { - throw new IllegalStateException("wrong doc count"); + + abstract List> groups(IndexSearcher searcher, int requestedNumSlices); + + private static PartitioningStrategy pick( + DataPartitioning dataPartitioning, + Function autoStrategy, + ShardContext ctx, + Query query + ) { + return switch (dataPartitioning) { + case SHARD -> PartitioningStrategy.SHARD; + case SEGMENT -> PartitioningStrategy.SEGMENT; + case DOC -> PartitioningStrategy.DOC; + case AUTO -> forAuto(autoStrategy, ctx, query); + }; + } + + /** + * {@link DataPartitioning#AUTO} resolves to {@link #SHARD} for indices + * with fewer than this many documents. + */ + private static final int SMALL_INDEX_BOUNDARY = MAX_DOCS_PER_SLICE; + + private static PartitioningStrategy forAuto(Function autoStrategy, ShardContext ctx, Query query) { + if (ctx.searcher().getIndexReader().maxDoc() < SMALL_INDEX_BOUNDARY) { + return PartitioningStrategy.SHARD; + } + return autoStrategy.apply(query); } - return slices; } - static List> segmentSlices(List leafContexts) { - IndexSearcher.LeafSlice[] gs = IndexSearcher.slices(leafContexts, MAX_DOCS_PER_SLICE, MAX_SEGMENTS_PER_SLICE, false); - return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList(); + static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) { + var searcher = ctx.searcher(); + try { + Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); + return searcher.createWeight(actualQuery, scoreMode, 1); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 63dbf2926275e..51b842d3f0ddc 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -7,8 +7,14 @@ package org.elasticsearch.compute.lucene; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.BoostQuery; import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; import org.elasticsearch.compute.data.BlockFactory; @@ -17,22 +23,30 @@ import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Limiter; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import static org.apache.lucene.search.ScoreMode.COMPLETE; import static org.apache.lucene.search.ScoreMode.COMPLETE_NO_SCORES; +import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.DOC; +import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.SEGMENT; +import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.SHARD; /** * Source operator that incrementally runs Lucene searches */ public class LuceneSourceOperator extends LuceneOperator { + private static final Logger log = LogManager.getLogger(LuceneSourceOperator.class); private int currentPagePos = 0; private int remainingDocs; @@ -59,11 +73,13 @@ public Factory( ) { super( contexts, - weightFunction(queryFunction, needsScore ? COMPLETE : COMPLETE_NO_SCORES), + queryFunction, dataPartitioning, + autoStrategy(limit), taskConcurrency, limit, - needsScore + needsScore, + needsScore ? COMPLETE : COMPLETE_NO_SCORES ); this.maxPageSize = maxPageSize; // TODO: use a single limiter for multiple stage execution @@ -91,6 +107,110 @@ public String describe() { + needsScore + "]"; } + + /** + * Pick a strategy for the {@link DataPartitioning#AUTO} partitioning. + */ + public static Function autoStrategy(int limit) { + return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; + } + + /** + * Use the {@link PartitioningStrategy#SHARD} strategy because + * it has the lowest overhead. Used when there is a {@code limit} on the operator + * because that's for cases like {@code FROM foo | LIMIT 10} or + * {@code FROM foo | WHERE a == 1 | LIMIT 10} when the {@code WHERE} can be pushed + * to Lucene. In those cases we're better off with the lowest overhead we can + * manage - and that's {@link PartitioningStrategy#SHARD}. + */ + private static PartitioningStrategy lowOverheadAutoStrategy(Query query) { + return SHARD; + } + + /** + * Select the {@link PartitioningStrategy} based on the {@link Query}. + *

+ */ + private static PartitioningStrategy highSpeedAutoStrategy(Query query) { + Query unwrapped = unwrap(query); + log.trace("highSpeedAutoStrategy {} {}", query, unwrapped); + return switch (unwrapped) { + case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q); + case MatchAllDocsQuery q -> DOC; + case MatchNoDocsQuery q -> SHARD; + default -> SEGMENT; + }; + } + + private static Query unwrap(Query query) { + while (true) { + switch (query) { + case BoostQuery q: { + query = q.getQuery(); + break; + } + case ConstantScoreQuery q: { + query = q.getQuery(); + break; + } + default: + return query; + } + } + } + + /** + * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}. + * + */ + private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) { + List clauses = new ArrayList<>(query.clauses().size()); + boolean allRequired = true; + for (BooleanClause c : query) { + Query clauseQuery = unwrap(c.query()); + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery); + if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery) + || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) { + // Can't match anything + return SHARD; + } + allRequired &= c.isRequired(); + clauses.add(highSpeedAutoStrategy(clauseQuery)); + } + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses); + if (allRequired == false) { + return SEGMENT; + } + if (clauses.stream().anyMatch(s -> s == SHARD)) { + return SHARD; + } + if (clauses.stream().anyMatch(s -> s == SEGMENT)) { + return SEGMENT; + } + assert clauses.stream().allMatch(s -> s == DOC); + return DOC; + } } @SuppressWarnings("this-escape") diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index cf03adcc4286b..d2c26655ea321 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -44,6 +44,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.lucene.search.ScoreMode.TOP_DOCS; +import static org.apache.lucene.search.ScoreMode.TOP_DOCS_WITH_SCORES; + /** * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN) */ @@ -63,7 +66,16 @@ public Factory( List> sorts, boolean needsScore ) { - super(contexts, weightFunction(queryFunction, sorts, needsScore), dataPartitioning, taskConcurrency, limit, needsScore); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + needsScore, + needsScore ? TOP_DOCS_WITH_SCORES : TOP_DOCS + ); this.maxPageSize = maxPageSize; this.sorts = sorts; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index 35738140d90c9..e6e2c041113b2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -33,8 +33,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index. *

@@ -58,7 +56,16 @@ private TimeSeriesSortedSourceOperatorFactory( int maxPageSize, int limit ) { - super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), DataPartitioning.SHARD, taskConcurrency, limit, false); + super( + contexts, + queryFunction, + DataPartitioning.SHARD, + query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); }, + taskConcurrency, + limit, + false, + ScoreMode.COMPLETE_NO_SCORES + ); this.maxPageSize = maxPageSize; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java index 87636ed4c05ba..4c97c78fdda95 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java @@ -100,7 +100,7 @@ protected Matcher expectedToStringOfSimple() { @Override protected Matcher expectedDescriptionOfSimple() { - return matchesRegex("LuceneCountOperator\\[dataPartitioning = (DOC|SHARD|SEGMENT), limit = 100]"); + return matchesRegex("LuceneCountOperator\\[dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), limit = 100]"); } // TODO tests for the other data partitioning configurations diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java index 4fee40f8e1476..5b3b0b6c63b2d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java @@ -203,7 +203,7 @@ protected final Matcher expectedDescriptionOfSimple() { return matchesRegex( "LuceneMaxOperator\\[type = " + getNumberType().name() - + ", dataPartitioning = (DOC|SHARD|SEGMENT), fieldName = " + + ", dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), fieldName = " + FIELD_NAME + ", limit = 100]" ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java index 4449a653945bd..298701c4f7cd7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java @@ -203,7 +203,7 @@ protected final Matcher expectedDescriptionOfSimple() { return matchesRegex( "LuceneMinOperator\\[type = " + getNumberType().name() - + ", dataPartitioning = (DOC|SHARD|SEGMENT), fieldName = " + + ", dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), fieldName = " + FIELD_NAME + ", limit = 100]" ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java new file mode 100644 index 0000000000000..a1f83e51d7bbe --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.BoostQuery; +import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.IndexOrDocValuesQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the {@link LuceneSourceOperator.Factory#autoStrategy} method. It picks + * the strategy based on complex rules around the query. + */ +public class LuceneSourceOperatorAutoStrategyTests extends ESTestCase { + @ParametersFactory(argumentFormatting = "%s -> %s") + public static Iterable parameters() { + return List.of( + new Object[] { new MatchAllDocsQuery(), LuceneSliceQueue.PartitioningStrategy.DOC }, + new Object[] { new ConstantScoreQuery(new MatchAllDocsQuery()), LuceneSliceQueue.PartitioningStrategy.DOC }, + + /* + * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00Z\" | .... + * when all @timestamps are in range + */ + new Object[] { + new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), + LuceneSliceQueue.PartitioningStrategy.DOC }, + new Object[] { new MatchNoDocsQuery(), LuceneSliceQueue.PartitioningStrategy.SHARD }, + + /* + * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00Z\" | STATS SUM(b) + * when all @timestamps are in range and all docs have b. + */ + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST) + .add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST) + .build(), + LuceneSliceQueue.PartitioningStrategy.DOC }, + + /* + * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00.120Z\" | STATS SUM(b) + * when *some* @timestamps are in range and all docs have b + */ + new Object[] { + new BooleanQuery.Builder().add( + new BoostQuery( + new ConstantScoreQuery( + new IndexOrDocValuesQuery( + LongPoint.newRangeQuery("@timestamp", 1735689600121L, 9223372036854775807L), + SortedNumericDocValuesField.newSlowRangeQuery("@timestamp", 1735689600121L, 9223372036854775807L) + ) + ), + 0.0F + ), + BooleanClause.Occur.MUST + ).add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST).build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT }, + + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST) + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT }, + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST_NOT) + .build(), + LuceneSliceQueue.PartitioningStrategy.SHARD }, + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new MatchAllDocsQuery(), BooleanClause.Occur.SHOULD) + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT }, + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new MatchNoDocsQuery(), BooleanClause.Occur.SHOULD) + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT } + ); + } + + private final Query query; + private final LuceneSliceQueue.PartitioningStrategy expectedUnlimited; + + public LuceneSourceOperatorAutoStrategyTests(Query query, LuceneSliceQueue.PartitioningStrategy expectedUnlimited) { + this.query = query; + this.expectedUnlimited = expectedUnlimited; + } + + public void testAutoStrategyLimited() { + Function auto = LuceneSourceOperator.Factory.autoStrategy( + between(1, LuceneOperator.NO_LIMIT - 1) + ); + assertThat(auto.apply(query), equalTo(LuceneSliceQueue.PartitioningStrategy.SHARD)); + } + + public void testAutoStrategyUnlimited() { + Function auto = LuceneSourceOperator.Factory.autoStrategy(LuceneOperator.NO_LIMIT); + assertThat(auto.apply(query), equalTo(expectedUnlimited)); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java index f45a2c645e9d3..3c711fc45147e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java @@ -12,7 +12,9 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.test.ESTestCase; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -31,7 +33,8 @@ public static LuceneSourceOperator.Status simple() { 123, 99990, 8000, - 222 + 222, + Map.of("b:0", LuceneSliceQueue.PartitioningStrategy.SHARD, "a:1", LuceneSliceQueue.PartitioningStrategy.DOC) ); } @@ -54,7 +57,11 @@ public static String simpleToJson() { "slice_min" : 123, "slice_max" : 99990, "current" : 8000, - "rows_emitted" : 222 + "rows_emitted" : 222, + "partitioning_strategies" : { + "a:1" : "DOC", + "b:0" : "SHARD" + } }"""; } @@ -80,7 +87,8 @@ public LuceneSourceOperator.Status createTestInstance() { randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt(), - randomNonNegativeLong() + randomNonNegativeLong(), + randomPartitioningStrategies() ); } @@ -102,6 +110,18 @@ private static Set randomProcessedShards() { return set; } + private static Map randomPartitioningStrategies() { + int size = between(0, 10); + Map partitioningStrategies = new HashMap<>(); + while (partitioningStrategies.size() < size) { + partitioningStrategies.put( + randomAlphaOfLength(3) + ":" + between(0, 10), + randomFrom(LuceneSliceQueue.PartitioningStrategy.values()) + ); + } + return partitioningStrategies; + } + @Override protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status instance) { int processedSlices = instance.processedSlices(); @@ -115,7 +135,8 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status int sliceMax = instance.sliceMax(); int current = instance.current(); long rowsEmitted = instance.rowsEmitted(); - switch (between(0, 10)) { + Map partitioningStrategies = instance.partitioningStrategies(); + switch (between(0, 11)) { case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt); case 1 -> processedQueries = randomValueOtherThan(processedQueries, LuceneSourceOperatorStatusTests::randomProcessedQueries); case 2 -> processedShards = randomValueOtherThan(processedShards, LuceneSourceOperatorStatusTests::randomProcessedShards); @@ -127,6 +148,10 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt); case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt); case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 11 -> partitioningStrategies = randomValueOtherThan( + partitioningStrategies, + LuceneSourceOperatorStatusTests::randomPartitioningStrategies + ); default -> throw new UnsupportedOperationException(); } return new LuceneSourceOperator.Status( @@ -140,7 +165,8 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status sliceMin, sliceMax, current, - rowsEmitted + rowsEmitted, + partitioningStrategies ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 1afbe083e3ef7..a079d0519f110 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -119,17 +119,34 @@ protected Matcher expectedToStringOfSimple() { @Override protected Matcher expectedDescriptionOfSimple() { return matchesRegex( - "LuceneSourceOperator" - + "\\[dataPartitioning = (DOC|SHARD|SEGMENT), maxPageSize = \\d+, limit = 100, needsScore = (true|false)]" + "LuceneSourceOperator\\[" + + "dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), " + + "maxPageSize = \\d+, " + + "limit = 100, " + + "needsScore = (true|false)]" ); } - // TODO tests for the other data partitioning configurations + public void testAutoPartitioning() { + testSimple(DataPartitioning.AUTO); + } + + public void testShardPartitioning() { + testSimple(DataPartitioning.SHARD); + } - public void testShardDataPartitioning() { + public void testSegmentPartitioning() { + testSimple(DataPartitioning.SEGMENT); + } + + public void testDocPartitioning() { + testSimple(DataPartitioning.DOC); + } + + private void testSimple(DataPartitioning partitioning) { int size = between(1_000, 20_000); int limit = between(10, size); - testSimple(driverContext(), size, limit); + testSimple(driverContext(), partitioning, size, limit); } public void testEarlyTermination() { @@ -168,12 +185,12 @@ public void testEarlyTermination() { } public void testEmpty() { - testSimple(driverContext(), 0, between(10, 10_000)); + testSimple(driverContext(), randomFrom(DataPartitioning.values()), 0, between(10, 10_000)); } - public void testWithCranky() { + public void testEmptyWithCranky() { try { - testSimple(crankyDriverContext(), between(1, 10_000), 100); + testSimple(crankyDriverContext(), randomFrom(DataPartitioning.values()), 0, between(10, 10_000)); logger.info("cranky didn't break"); } catch (CircuitBreakingException e) { logger.info("broken", e); @@ -181,21 +198,27 @@ public void testWithCranky() { } } - public void testEmptyWithCranky() { - try { - testSimple(crankyDriverContext(), 0, between(10, 10_000)); - logger.info("cranky didn't break"); - } catch (CircuitBreakingException e) { - logger.info("broken", e); - assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); - } + public void testAutoPartitioningWithCranky() { + testWithCranky(DataPartitioning.AUTO); + } + + public void testShardPartitioningWithCranky() { + testWithCranky(DataPartitioning.SHARD); + } + + public void testSegmentPartitioningWithCranky() { + testWithCranky(DataPartitioning.SEGMENT); + } + + public void testDocPartitioningWithCranky() { + testWithCranky(DataPartitioning.DOC); } - public void testShardDataPartitioningWithCranky() { + private void testWithCranky(DataPartitioning partitioning) { int size = between(1_000, 20_000); int limit = between(10, size); try { - testSimple(crankyDriverContext(), size, limit); + testSimple(crankyDriverContext(), partitioning, size, limit); logger.info("cranky didn't break"); } catch (CircuitBreakingException e) { logger.info("broken", e); @@ -203,8 +226,8 @@ public void testShardDataPartitioningWithCranky() { } } - private void testSimple(DriverContext ctx, int size, int limit) { - LuceneSourceOperator.Factory factory = simple(DataPartitioning.SHARD, size, limit, scoring); + private void testSimple(DriverContext ctx, DataPartitioning partitioning, int size, int limit) { + LuceneSourceOperator.Factory factory = simple(partitioning, size, limit, scoring); Operator.OperatorFactory readS = ValuesSourceReaderOperatorTests.factory(reader, S_FIELD, ElementType.LONG); List results = new ArrayList<>(); diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java new file mode 100644 index 0000000000000..238f5aacce588 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -0,0 +1,229 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.hamcrest.Matcher; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class EsqlPartitioningIT extends ESRestTestCase { + /** + * The size of the large index we use to test partitioning. It has to be + * at least 250,000 entries for AUTO partitioning to do interesting things. + */ + private static final int IDX_DOCS = 300_000; + /** + * The size of the small index we use to test partitioning. It's small enough + * that AUTO should always choose SHARD partitioning. + */ + private static final int SMALL_IDX_DOCS = 20_000; + + private static final long BASE_TIME = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z"); + private static final int BULK_CHARS = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); + + record Case(String suffix, String idxPartition) {} + + @ParametersFactory(argumentFormatting = "[%1$s] %3$s -> %4$s") + public static Iterable parameters() { + List params = new ArrayList<>(); + for (String defaultDataPartitioning : new String[] { null, "shard", "segment", "doc", "auto" }) { + for (String index : new String[] { "idx", "small_idx" }) { + for (Case c : new Case[] { + new Case("", "SHARD"), + new Case("| SORT @timestamp ASC", "SHARD"), + new Case("| WHERE ABS(a) == 1", "DOC"), + new Case("| WHERE a == 1", "SHARD"), + new Case("| STATS SUM(a)", "DOC"), + new Case("| MV_EXPAND a | STATS SUM(a)", "DOC"), + new Case("| WHERE a == 1 | STATS SUM(a)", "SEGMENT"), + new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), }) { + params.add( + new Object[] { + defaultDataPartitioning, + index, + "FROM " + index + " " + c.suffix, + expectedPartition(defaultDataPartitioning, index, c.idxPartition) } + ); + } + } + } + return params; + } + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(); + + private final String defaultDataPartitioning; + private final String index; + private final String query; + private final Matcher expectedPartition; + + public EsqlPartitioningIT(String defaultDataPartitioning, String index, String query, Matcher expectedPartition) { + this.defaultDataPartitioning = defaultDataPartitioning; + this.index = index; + this.query = query; + this.expectedPartition = expectedPartition; + } + + public void test() throws IOException { + setupIndex(index, switch (index) { + case "idx" -> IDX_DOCS; + case "small_idx" -> SMALL_IDX_DOCS; + default -> throw new IllegalArgumentException("unknown index [" + index + "]"); + }); + setDefaultDataPartitioning(defaultDataPartitioning); + try { + assertThat(partitionForQuery(query), expectedPartition); + } finally { + setDefaultDataPartitioning(null); + } + } + + private void setDefaultDataPartitioning(String defaultDataPartitioning) throws IOException { + Request request = new Request("PUT", "/_cluster/settings"); + XContentBuilder builder = JsonXContent.contentBuilder().startObject(); + builder.startObject("transient"); + builder.field("esql.default_data_partitioning", defaultDataPartitioning); + builder.endObject(); + request.setJsonEntity(Strings.toString(builder.endObject())); + int code = client().performRequest(request).getStatusLine().getStatusCode(); + assertThat(code, equalTo(200)); + } + + private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition) { + return switch (defaultDataPartitioning) { + case null -> expectedAutoPartition(index, idxPartition); + case "auto" -> expectedAutoPartition(index, idxPartition); + default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT)); + }; + } + + private static Matcher expectedAutoPartition(String index, String idxPartition) { + return equalTo(switch (index) { + case "idx" -> idxPartition; + case "small_idx" -> "SHARD"; + default -> throw new UnsupportedOperationException("unknown index [" + index + "]"); + }); + } + + private String partitionForQuery(String query) throws IOException { + Request request = new Request("POST", "_query"); + request.addParameter("pretty", ""); + request.addParameter("error_trace", ""); + XContentBuilder b = JsonXContent.contentBuilder().startObject(); + b.field("query", query); + b.field("profile", true); + request.setJsonEntity(Strings.toString(b.endObject())); + request.setOptions(request.getOptions().toBuilder().setWarningsHandler(w -> { + w.remove("No limit defined, adding default limit of [1000]"); + return w.isEmpty() == false; + })); + String response = EntityUtils.toString(client().performRequest(request).getEntity()); + logger.info("Response: {}", response); + Map profile = (Map) XContentHelper.convertToMap(JsonXContent.jsonXContent, response, true).get("profile"); + List drivers = (List) profile.get("drivers"); + for (Object dItem : drivers) { + Map d = (Map) dItem; + if (false == "data".equals(d.get("description"))) { + continue; + } + List operators = (List) d.get("operators"); + for (Object oItem : operators) { + Map o = (Map) oItem; + String operator = o.get("operator").toString(); + if (false == operator.startsWith("Lucene")) { + continue; + } + Map status = (Map) o.get("status"); + Map partitioningStrategies = (Map) status.get("partitioning_strategies"); + String strat = (String) partitioningStrategies.get(index + ":0"); + if (strat == null) { + throw new IllegalArgumentException("didn't find partition strategy for: " + o); + } + return strat; + } + } + throw new IllegalArgumentException("didn't find partition strategy for: " + drivers); + } + + private void setupIndex(String index, int docs) throws IOException { + Response exists = client().performRequest(new Request("HEAD", "/" + index)); + if (exists.getStatusLine().getStatusCode() == 200) { + return; + } + Request create = new Request("PUT", index); + create.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1 + } + } + }"""); // Use a single shard to get consistent results. + client().performRequest(create); + StringBuilder bulk = new StringBuilder(); + for (int d = 0; d < docs; d++) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"@timestamp\": \""); + bulk.append(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(BASE_TIME + d)); + bulk.append("\", \"a\": "); + bulk.append(d % 10); + bulk.append("}\n"); + if (bulk.length() > BULK_CHARS) { + logger.info("indexing {}: {}", index, d); + bulk(index, bulk.toString()); + bulk.setLength(0); + } + } + logger.info("indexing {}: {}", index, docs); + bulk(index, bulk.toString()); + client().performRequest(new Request("POST", "/" + index + "/_refresh")); + } + + private void bulk(String index, String bulk) throws IOException { + Request request = new Request("POST", index + "/_bulk"); + request.setJsonEntity(bulk); + Response response = client().performRequest(request); + if (response.getStatusLine().getStatusCode() != 200) { + throw new AssertionError("error with bulk: " + EntityUtils.toString(response.getEntity())); + } + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 1b44536eed508..814cd8bfd9796 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -687,7 +687,8 @@ private String checkOperatorProfile(Map o) { .entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("process_nanos", greaterThan(0)) - .entry("processed_queries", List.of("*:*")); + .entry("processed_queries", List.of("*:*")) + .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("rows_received", greaterThan(0)) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index b2d627f4090b3..a93e2f16c075b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -17,6 +17,7 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperator; @@ -101,10 +102,17 @@ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardCont } private final List shardContexts; + private final DataPartitioning defaultDataPartitioning; - public EsPhysicalOperationProviders(FoldContext foldContext, List shardContexts, AnalysisRegistry analysisRegistry) { + public EsPhysicalOperationProviders( + FoldContext foldContext, + List shardContexts, + AnalysisRegistry analysisRegistry, + DataPartitioning defaultDataPartitioning + ) { super(foldContext, analysisRegistry); this.shardContexts = shardContexts; + this.defaultDataPartitioning = defaultDataPartitioning; } @Override @@ -199,7 +207,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -219,7 +227,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -241,7 +249,7 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont return new LuceneCountOperator.Factory( shardContexts, querySupplier(queryBuilder), - context.queryPragmas().dataPartitioning(), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), context.queryPragmas().taskConcurrency(), limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx()) ); @@ -278,11 +286,14 @@ public static class DefaultShardContext implements ShardContext { private final int index; private final SearchExecutionContext ctx; private final AliasFilter aliasFilter; + private final String shardIdentifier; public DefaultShardContext(int index, SearchExecutionContext ctx, AliasFilter aliasFilter) { this.index = index; this.ctx = ctx; this.aliasFilter = aliasFilter; + // Build the shardIdentifier once up front so we can reuse references to it in many places. + this.shardIdentifier = ctx.getFullyQualifiedIndex().getName() + ":" + ctx.getShardId(); } @Override @@ -302,7 +313,7 @@ public Optional buildSort(List> sorts) throws IOE @Override public String shardIdentifier() { - return ctx.getFullyQualifiedIndex().getName() + ":" + ctx.getShardId(); + return shardIdentifier; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 5724587f0573b..6ebaeca1e56c0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverTaskRunner; @@ -133,6 +134,8 @@ public class ComputeService { private final ClusterComputeHandler clusterComputeHandler; private final ExchangeService exchangeService; + private volatile DataPartitioning defaultDataPartitioning; + @SuppressWarnings("this-escape") public ComputeService( TransportActionServices transportActionServices, @@ -161,6 +164,7 @@ public ComputeService( esqlExecutor, dataNodeComputeHandler ); + clusterService.getClusterSettings().initializeAndWatch(EsqlPlugin.DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); } public void execute( @@ -429,7 +433,12 @@ public SourceProvider createSourceProvider() { enrichLookupService, lookupFromIndexService, inferenceRunner, - new EsPhysicalOperationProviders(context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis()), + new EsPhysicalOperationProviders( + context.foldCtx(), + contexts, + searchService.getIndicesService().getAnalysis(), + defaultDataPartitioning + ), contexts ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 8ff2638137e5d..676ebc0640966 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockFactoryProvider; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; @@ -154,6 +155,14 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.Dynamic ); + public static final Setting DEFAULT_DATA_PARTITIONING = Setting.enumSetting( + DataPartitioning.class, + "esql.default_data_partitioning", + DataPartitioning.AUTO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + @Override public Collection createComponents(PluginServices services) { CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request"); @@ -216,7 +225,8 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_DEBUG_SETTING, ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, - ESQL_QUERYLOG_INCLUDE_USER_SETTING + ESQL_QUERYLOG_INCLUDE_USER_SETTING, + DEFAULT_DATA_PARTITIONING ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index bc1ba595c5481..b8aa1a7badcab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.core.TimeValue; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import java.io.IOException; +import java.util.Locale; import java.util.Objects; /** @@ -38,11 +40,14 @@ public final class QueryPragmas implements Writeable { ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY)) ); - public static final Setting DATA_PARTITIONING = Setting.enumSetting( - DataPartitioning.class, - "data_partitioning", - DataPartitioning.SEGMENT - ); + /** + * How to cut {@link LuceneSliceQueue slices} to cut each shard into. Is parsed to + * the enum {@link DataPartitioning} which has more documentation. Not an + * {@link Setting#enumSetting} because those can't have {@code null} defaults. + * {@code null} here means "use the default from the cluster setting + * named {@link EsqlPlugin#DEFAULT_DATA_PARTITIONING}." + */ + public static final Setting DATA_PARTITIONING = Setting.simpleString("data_partitioning"); /** * Size of a page in entries with {@code 0} being a special value asking @@ -100,8 +105,12 @@ public int concurrentExchangeClients() { return EXCHANGE_CONCURRENT_CLIENTS.get(settings); } - public DataPartitioning dataPartitioning() { - return DATA_PARTITIONING.get(settings); + public DataPartitioning dataPartitioning(DataPartitioning defaultDataPartitioning) { + String partitioning = DATA_PARTITIONING.get(settings); + if (partitioning.isEmpty()) { + return defaultDataPartitioning; + } + return DataPartitioning.valueOf(partitioning.toUpperCase(Locale.ROOT)); } public int taskConcurrency() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 19e449d04f6ba..91398ab1bfad8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.compute.test.TestBlockFactory; @@ -7675,7 +7676,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP null, null, null, - new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null), + new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO), List.of() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index e07f6bc51e286..d556ee68d4033 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -256,7 +257,7 @@ private Configuration config() { } private EsPhysicalOperationProviders esPhysicalOperationProviders(List shardContexts) { - return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null); + return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO); } private List createShardContexts() throws IOException {