From 23ff7c57665356ff201a17d2c7cc6fe918dd74d7 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 26 Mar 2025 16:43:00 -0400 Subject: [PATCH 01/13] ESQL: Heuristics to pick efficient partitioning Adds heuristics to pick an efficient partitioning strategy based on the index and rewritten query. This speeds up some queries by throwing more cores at the problem: ``` FROM test | STATS SUM(b) Before: took: 31 CPU: 222.3% After: took: 15 CPU: 806.9% ``` It also lowers the overhead of simpler queries by throwing less cores at the problem when it won't really speed anything up: ``` FROM test Before: took: 1 CPU: 48.5% After: took: 1 CPU: 70.4% ``` We have had a `pragma` to control our data partitioning for a long time, this just looks at the query to pick a partitioning scheme. The partitioning options: * `shard`: use one core per shard * `segment`: use one core per large segment * `doc`: break each shard into as many segments as there are cores `doc` is the fastest, but has a lot of overhead, especially for complex Lucene queries. `segment` is fast, but doesn't make the most out of CPUs when there are few segments. `shard` has the lowest overhead. Previously we always used `segment` partitioning because it doesn't have the terrible overhead but is fast. With this change we use `doc` when the top level query matches all documents - those have very very low overhead even in the `doc` partitioning. That's the twice as fast example above. This also uses the `shard` partitioning for queries that don't have to do much work like `FROM foo` or `FROM foo | LIMIT 1` or `FROM foo | SORT a`. That's the lower CPU example above. This forking choice is taken very late on the data node. So queries like this: ``` FROM test | WHERE @timestamp > "2025-01-01T00:00:00Z" | STATS SUM(b) ``` can also use the `doc` partitioning when all documents are after the timestamp and all documents have `b`. --- .../compute/lucene/DataPartitioning.java | 34 +++- .../compute/lucene/LuceneCountOperator.java | 10 +- .../compute/lucene/LuceneMaxFactory.java | 10 +- .../compute/lucene/LuceneMinFactory.java | 10 +- .../compute/lucene/LuceneOperator.java | 18 +- .../compute/lucene/LuceneSliceQueue.java | 178 ++++++++++++------ .../compute/lucene/LuceneSourceOperator.java | 86 ++++++++- .../lucene/LuceneTopNSourceOperator.java | 10 +- ...TimeSeriesSortedSourceOperatorFactory.java | 10 +- ...LuceneSourceOperatorAutoStrategyTests.java | 96 ++++++++++ .../xpack/esql/plugin/QueryPragmas.java | 2 +- 11 files changed, 384 insertions(+), 80 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java index 926b9e08d2e08..f0acff78ba8a8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java @@ -7,11 +7,39 @@ package org.elasticsearch.compute.lucene; -public enum DataPartitioning { +import org.elasticsearch.compute.operator.Driver; +/** + * How we partition the data across {@link Driver}s. Each request forks into + * {@code min(cpus, partition_count)} threads on the data node. More partitions + * allow us to bring more threads to bear on CPU intensive data node side tasks. + */ +public enum DataPartitioning { + /** + * Automatically select the data partitioning based on the query and index. + * Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}. + * When the additional overhead from {@link #DOC} is fairly low then it'll + * pick {@link #DOC}. + */ + AUTO, + /** + * Make one partition per shard. This is generally the slowest option, but it + * has the lowest CPU overhead. + */ SHARD, - + /** + * Partition on segment boundaries, this doesn't allow forking to as many CPUs + * as {@link #DOC} but it has much lower overhead. + *

+ * It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE} + * docs together into a partition. Larger segments get their own partition. + * Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}. + */ SEGMENT, - + /** + * Partition each shard into {@code task_concurrency} partitions, splitting + * larger segments into slices. This allows bringing the most CPUs to bear on + * the problem but adds extra overhead, especially in query preparation. + */ DOC, } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java index 34c27a5c1fdff..99df8320f174e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java @@ -49,7 +49,15 @@ public Factory( int taskConcurrency, int limit ) { - super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + ScoreMode.COMPLETE_NO_SCORES + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java index ba7de22b1b821..c17ceff052638 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java @@ -121,7 +121,15 @@ public LuceneMaxFactory( NumberType numberType, int limit ) { - super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + ScoreMode.COMPLETE_NO_SCORES + ); this.fieldName = fieldName; this.numberType = numberType; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java index e3c6c8310373d..a50bf3b4a7cd6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java @@ -121,7 +121,15 @@ public LuceneMinFactory( NumberType numberType, int limit ) { - super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + ScoreMode.COMPLETE_NO_SCORES + ); this.fieldName = fieldName; this.numberType = numberType; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 7547e2da3e184..2fd133cf496a3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -9,7 +9,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.BulkScorer; -import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; @@ -96,6 +95,7 @@ protected Factory( List contexts, Function queryFunction, DataPartitioning dataPartitioning, + Function autoStrategy, int taskConcurrency, int limit, ScoreMode scoreMode @@ -103,8 +103,7 @@ protected Factory( this.limit = limit; this.scoreMode = scoreMode; this.dataPartitioning = dataPartitioning; - var weightFunction = weightFunction(queryFunction, scoreMode); - this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency); + this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode); this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency); } @@ -470,17 +469,4 @@ public TransportVersion getMinimalSupportedVersion() { return TransportVersions.V_8_11_X; } } - - static Function weightFunction(Function queryFunction, ScoreMode scoreMode) { - return ctx -> { - final var query = queryFunction.apply(ctx); - final var searcher = ctx.searcher(); - try { - Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); - return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }; - } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 0407e0f726044..bc4fc0f02ce43 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -7,16 +7,19 @@ package org.elasticsearch.compute.lucene; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.elasticsearch.core.Nullable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -26,8 +29,8 @@ * Shared Lucene slices between Lucene operators. */ public final class LuceneSliceQueue { - private static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher - private static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher + public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher + public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher private final int totalSlices; private final Queue slices; @@ -52,19 +55,27 @@ public Collection remainingShardsIdentifiers() { public static LuceneSliceQueue create( List contexts, - Function weightFunction, + Function queryFunction, DataPartitioning dataPartitioning, - int taskConcurrency + Function autoStrategy, + int taskConcurrency, + ScoreMode scoreMode ) { final List slices = new ArrayList<>(); for (ShardContext ctx : contexts) { - final List leafContexts = ctx.searcher().getLeafContexts(); - List> groups = switch (dataPartitioning) { - case SHARD -> Collections.singletonList(leafContexts.stream().map(PartialLeafReaderContext::new).toList()); - case SEGMENT -> segmentSlices(leafContexts); - case DOC -> docSlices(ctx.searcher().getIndexReader(), taskConcurrency); - }; - final Weight weight = weightFunction.apply(ctx); + Query query = queryFunction.apply(ctx); + /* + * Rewrite the query on the local segment so things like fully + * overlapping range queries become match all. + */ + try { + query = query.rewrite(ctx.searcher()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query); + List> groups = partitioning.groups(ctx.searcher(), taskConcurrency); + Weight weight = weight(ctx, query, scoreMode); for (List group : groups) { if (group.isEmpty() == false) { slices.add(new LuceneSlice(ctx, group, weight)); @@ -74,54 +85,113 @@ public static LuceneSliceQueue create( return new LuceneSliceQueue(slices); } - static List> docSlices(IndexReader indexReader, int numSlices) { - final int totalDocCount = indexReader.maxDoc(); - final int normalMaxDocsPerSlice = totalDocCount / numSlices; - final int extraDocsInFirstSlice = totalDocCount % numSlices; - final List> slices = new ArrayList<>(); - int docsAllocatedInCurrentSlice = 0; - List currentSlice = null; - int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice; - for (LeafReaderContext ctx : indexReader.leaves()) { - final int numDocsInLeaf = ctx.reader().maxDoc(); - int minDoc = 0; - while (minDoc < numDocsInLeaf) { - int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc); - if (numDocsToUse <= 0) { - break; - } - if (currentSlice == null) { - currentSlice = new ArrayList<>(); + public enum PartitioningStrategy { + SHARD { + @Override + List> groups(IndexSearcher searcher, int requestedNumSlices) { + return List.of(searcher.getLeafContexts().stream().map(PartialLeafReaderContext::new).toList()); + } + }, + SEGMENT { + @Override + List> groups(IndexSearcher searcher, int requestedNumSlices) { + IndexSearcher.LeafSlice[] gs = IndexSearcher.slices( + searcher.getLeafContexts(), + MAX_DOCS_PER_SLICE, + MAX_SEGMENTS_PER_SLICE, + false + ); + return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList(); + } + }, + DOC { + @Override + List> groups(IndexSearcher searcher, int requestedNumSlices) { + final int totalDocCount = searcher.getIndexReader().maxDoc(); + final int normalMaxDocsPerSlice = totalDocCount / requestedNumSlices; + final int extraDocsInFirstSlice = totalDocCount % requestedNumSlices; + final List> slices = new ArrayList<>(); + int docsAllocatedInCurrentSlice = 0; + List currentSlice = null; + int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice; + for (LeafReaderContext ctx : searcher.getLeafContexts()) { + final int numDocsInLeaf = ctx.reader().maxDoc(); + int minDoc = 0; + while (minDoc < numDocsInLeaf) { + int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc); + if (numDocsToUse <= 0) { + break; + } + if (currentSlice == null) { + currentSlice = new ArrayList<>(); + } + currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse)); + minDoc += numDocsToUse; + docsAllocatedInCurrentSlice += numDocsToUse; + if (docsAllocatedInCurrentSlice == maxDocsPerSlice) { + slices.add(currentSlice); + // once the first slice with the extra docs is added, no need for extra docs + maxDocsPerSlice = normalMaxDocsPerSlice; + currentSlice = null; + docsAllocatedInCurrentSlice = 0; + } + } } - currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse)); - minDoc += numDocsToUse; - docsAllocatedInCurrentSlice += numDocsToUse; - if (docsAllocatedInCurrentSlice == maxDocsPerSlice) { + if (currentSlice != null) { slices.add(currentSlice); - maxDocsPerSlice = normalMaxDocsPerSlice; // once the first slice with the extra docs is added, no need for extra docs - currentSlice = null; - docsAllocatedInCurrentSlice = 0; } + if (requestedNumSlices < totalDocCount && slices.size() != requestedNumSlices) { + throw new IllegalStateException("wrong number of slices, expected " + requestedNumSlices + " but got " + slices.size()); + } + if (slices.stream() + .flatMapToInt( + l -> l.stream() + .mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc()) + ) + .sum() != totalDocCount) { + throw new IllegalStateException("wrong doc count"); + } + return slices; } + }; + + abstract List> groups(IndexSearcher searcher, int requestedNumSlices); + + private static PartitioningStrategy pick( + DataPartitioning dataPartitioning, + Function autoStrategy, + ShardContext ctx, + Query query + ) { + return switch (dataPartitioning) { + case SHARD -> PartitioningStrategy.SHARD; + case SEGMENT -> PartitioningStrategy.SEGMENT; + case DOC -> PartitioningStrategy.DOC; + case AUTO -> forAuto(autoStrategy, ctx, query); + }; } - if (currentSlice != null) { - slices.add(currentSlice); - } - if (numSlices < totalDocCount && slices.size() != numSlices) { - throw new IllegalStateException("wrong number of slices, expected " + numSlices + " but got " + slices.size()); - } - if (slices.stream() - .flatMapToInt( - l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc()) - ) - .sum() != totalDocCount) { - throw new IllegalStateException("wrong doc count"); + + /** + * {@link DataPartitioning#AUTO} resolves to {@link #SHARD} for indices + * with fewer than this many documents. + */ + private static final int SMALL_INDEX_BOUNDARY = MAX_DOCS_PER_SLICE; + + private static PartitioningStrategy forAuto(Function autoStrategy, ShardContext ctx, Query query) { + if (ctx.searcher().getIndexReader().maxDoc() < SMALL_INDEX_BOUNDARY) { + return PartitioningStrategy.SHARD; + } + return autoStrategy.apply(query); } - return slices; } - static List> segmentSlices(List leafContexts) { - IndexSearcher.LeafSlice[] gs = IndexSearcher.slices(leafContexts, MAX_DOCS_PER_SLICE, MAX_SEGMENTS_PER_SLICE, false); - return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList(); + static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) { + var searcher = ctx.searcher(); + try { + Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); + return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 76f0fb0167b86..0bf6d45372784 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -7,8 +7,14 @@ package org.elasticsearch.compute.lucene; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.BoostQuery; import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; @@ -58,7 +64,15 @@ public Factory( int limit, boolean scoring ) { - super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, scoring ? COMPLETE : COMPLETE_NO_SCORES); + super( + contexts, + queryFunction, + dataPartitioning, + autoStrategy(limit), + taskConcurrency, + limit, + scoring ? COMPLETE : COMPLETE_NO_SCORES + ); this.maxPageSize = maxPageSize; // TODO: use a single limiter for multiple stage execution this.limiter = limit == NO_LIMIT ? Limiter.NO_LIMIT : new Limiter(limit); @@ -85,6 +99,76 @@ public String describe() { + scoreMode + "]"; } + + /** + * Pick a strategy for the {@link DataPartitioning#AUTO} partitioning. + */ + public static Function autoStrategy(int limit) { + return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; + } + + /** + * Use the {@link LuceneSliceQueue.PartitioningStrategy#SHARD} strategy because + * it has the lowest overhead. Used when there is a {@code limit} on the operator + * because that's for cases like {@code FROM foo | LIMIT 10} or + * {@code FROM foo | WHERE a == 1 | LIMIT 10} when the {@code WHERE} can be pushed + * to Lucene. In those cases we're better off with the lowest overhead we can + * manage - and that's {@link LuceneSliceQueue.PartitioningStrategy#SHARD}. + */ + private static LuceneSliceQueue.PartitioningStrategy lowOverheadAutoStrategy(Query query) { + return LuceneSliceQueue.PartitioningStrategy.SHARD; + } + + /** + * Select the {@link LuceneSliceQueue.PartitioningStrategy} based on the {@link Query}. + *

+ */ + private static LuceneSliceQueue.PartitioningStrategy highSpeedAutoStrategy(Query query) { + if (query instanceof ConstantScoreQuery c) { + return highSpeedAutoStrategy(c.getQuery()); + } + if (query instanceof BoostQuery b) { + return highSpeedAutoStrategy(b.getQuery()); + } + if (query instanceof BooleanQuery bq) { + for (BooleanClause c : bq) { + if (c.isRequired() == false) { + return LuceneSliceQueue.PartitioningStrategy.SEGMENT; + } + LuceneSliceQueue.PartitioningStrategy forClause = highSpeedAutoStrategy(c.query()); + if (forClause == LuceneSliceQueue.PartitioningStrategy.SHARD) { + return LuceneSliceQueue.PartitioningStrategy.SHARD; + } + if (forClause != LuceneSliceQueue.PartitioningStrategy.DOC) { + return LuceneSliceQueue.PartitioningStrategy.SEGMENT; + } + } + return LuceneSliceQueue.PartitioningStrategy.DOC; + } + if (query instanceof MatchAllDocsQuery) { + return LuceneSliceQueue.PartitioningStrategy.DOC; + } + if (query instanceof MatchNoDocsQuery) { + return LuceneSliceQueue.PartitioningStrategy.SHARD; + } + return LuceneSliceQueue.PartitioningStrategy.SEGMENT; + } } @SuppressWarnings("this-escape") diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 83d69393a45a2..569af434f7615 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -65,7 +65,15 @@ public Factory( List> sorts, boolean scoring ) { - super(contexts, queryFunction, dataPartitioning, taskConcurrency, limit, scoring ? TOP_DOCS_WITH_SCORES : TOP_DOCS); + super( + contexts, + queryFunction, + dataPartitioning, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + taskConcurrency, + limit, + scoring ? TOP_DOCS_WITH_SCORES : TOP_DOCS + ); this.maxPageSize = maxPageSize; this.sorts = sorts; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index 8d37feb37d8b6..61dc50386719d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -56,7 +56,15 @@ private TimeSeriesSortedSourceOperatorFactory( int maxPageSize, int limit ) { - super(contexts, queryFunction, DataPartitioning.SHARD, taskConcurrency, limit, ScoreMode.COMPLETE_NO_SCORES); + super( + contexts, + queryFunction, + DataPartitioning.SHARD, + query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); }, + taskConcurrency, + limit, + ScoreMode.COMPLETE_NO_SCORES + ); this.maxPageSize = maxPageSize; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java new file mode 100644 index 0000000000000..bfd161594b16d --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.BoostQuery; +import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.IndexOrDocValuesQuery; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; + +public class LuceneSourceOperatorAutoStrategyTests extends ESTestCase { + @ParametersFactory(argumentFormatting = "%s -> %s") + public static Iterable parameters() { + return List.of( + new Object[] { new MatchAllDocsQuery(), LuceneSliceQueue.PartitioningStrategy.DOC }, + new Object[] { new ConstantScoreQuery(new MatchAllDocsQuery()), LuceneSliceQueue.PartitioningStrategy.DOC }, + + /* + * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00Z\" | .... + * when all @timestamps are in range + */ + new Object[] { + new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), + LuceneSliceQueue.PartitioningStrategy.DOC }, + new Object[] { new MatchNoDocsQuery(), LuceneSliceQueue.PartitioningStrategy.SHARD }, + + /* + * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00Z\" | STATS SUM(b) + * when all @timestamps are in range and all docs have b. + */ + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST) + .add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST) + .build(), + LuceneSliceQueue.PartitioningStrategy.DOC }, + + /* + * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00.120Z\" | STATS SUM(b) + * when *some* @timestamps are in range and all docs have b + */ + new Object[] { + new BooleanQuery.Builder().add( + new BoostQuery( + new ConstantScoreQuery( + new IndexOrDocValuesQuery( + LongPoint.newRangeQuery("@timestamp", 1735689600121L, 9223372036854775807L), + SortedNumericDocValuesField.newSlowRangeQuery("@timestamp", 1735689600121L, 9223372036854775807L) + ) + ), + 0.0F + ), + BooleanClause.Occur.MUST + ).add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST).build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT } + ); + } + + private final Query query; + private final LuceneSliceQueue.PartitioningStrategy expectedUnlimited; + + public LuceneSourceOperatorAutoStrategyTests(Query query, LuceneSliceQueue.PartitioningStrategy expectedUnlimited) { + this.query = query; + this.expectedUnlimited = expectedUnlimited; + } + + public void testAutoStrategyLimited() { + Function auto = LuceneSourceOperator.Factory.autoStrategy( + between(1, LuceneOperator.NO_LIMIT - 1) + ); + assertThat(auto.apply(query), equalTo(LuceneSliceQueue.PartitioningStrategy.SHARD)); + } + + public void testAutoStrategyUnlimited() { + Function auto = LuceneSourceOperator.Factory.autoStrategy(LuceneOperator.NO_LIMIT); + assertThat(auto.apply(query), equalTo(expectedUnlimited)); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index bc1ba595c5481..f0e82bfdbc729 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -41,7 +41,7 @@ public final class QueryPragmas implements Writeable { public static final Setting DATA_PARTITIONING = Setting.enumSetting( DataPartitioning.class, "data_partitioning", - DataPartitioning.SEGMENT + DataPartitioning.AUTO ); /** From 47bb86c64e127eb911c42da22a4ed08175e31dc7 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 26 Mar 2025 18:58:54 -0400 Subject: [PATCH 02/13] Update docs/changelog/125739.yaml --- docs/changelog/125739.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/125739.yaml diff --git a/docs/changelog/125739.yaml b/docs/changelog/125739.yaml new file mode 100644 index 0000000000000..cc5fa57b0f09b --- /dev/null +++ b/docs/changelog/125739.yaml @@ -0,0 +1,5 @@ +pr: 125739 +summary: Heuristics to pick efficient partitioning +area: ES|QL +type: enhancement +issues: [] From 527c69ae0468d79f0008f5c7854286b35ec362ae Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 26 Mar 2025 21:55:43 -0400 Subject: [PATCH 03/13] Fix TODO --- .../lucene/LuceneSourceOperatorTests.java | 63 +++++++++++++------ 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 39c85186d6a0d..69abeb8ab78ce 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -119,17 +119,34 @@ protected Matcher expectedToStringOfSimple() { @Override protected Matcher expectedDescriptionOfSimple() { return matchesRegex( - "LuceneSourceOperator" - + "\\[dataPartitioning = (DOC|SHARD|SEGMENT), maxPageSize = \\d+, limit = 100, scoreMode = (COMPLETE|COMPLETE_NO_SCORES)]" + "LuceneSourceOperator\\[" + + "dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), " + + "maxPageSize = \\d+, " + + "limit = 100, " + + "scoreMode = (COMPLETE|COMPLETE_NO_SCORES)]" ); } - // TODO tests for the other data partitioning configurations + public void testAutoPartitioning() { + testSimple(DataPartitioning.AUTO); + } + + public void testShardPartitioning() { + testSimple(DataPartitioning.SHARD); + } - public void testShardDataPartitioning() { + public void testSegmentPartitioning() { + testSimple(DataPartitioning.SEGMENT); + } + + public void testDocPartitioning() { + testSimple(DataPartitioning.DOC); + } + + private void testSimple(DataPartitioning partitioning) { int size = between(1_000, 20_000); int limit = between(10, size); - testSimple(driverContext(), size, limit); + testSimple(driverContext(), partitioning, size, limit); } public void testEarlyTermination() { @@ -168,12 +185,12 @@ public void testEarlyTermination() { } public void testEmpty() { - testSimple(driverContext(), 0, between(10, 10_000)); + testSimple(driverContext(), randomFrom(DataPartitioning.values()), 0, between(10, 10_000)); } - public void testWithCranky() { + public void testEmptyWithCranky() { try { - testSimple(crankyDriverContext(), between(1, 10_000), 100); + testSimple(crankyDriverContext(), randomFrom(DataPartitioning.values()), 0, between(10, 10_000)); logger.info("cranky didn't break"); } catch (CircuitBreakingException e) { logger.info("broken", e); @@ -181,21 +198,27 @@ public void testWithCranky() { } } - public void testEmptyWithCranky() { - try { - testSimple(crankyDriverContext(), 0, between(10, 10_000)); - logger.info("cranky didn't break"); - } catch (CircuitBreakingException e) { - logger.info("broken", e); - assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); - } + public void testAutoPartitioningWithCranky() { + testWithCranky(DataPartitioning.AUTO); + } + + public void testShardPartitioningWithCranky() { + testWithCranky(DataPartitioning.SHARD); + } + + public void testSegmentPartitioningWithCranky() { + testWithCranky(DataPartitioning.SEGMENT); + } + + public void testDocPartitioningWithCranky() { + testWithCranky(DataPartitioning.DOC); } - public void testShardDataPartitioningWithCranky() { + private void testWithCranky(DataPartitioning partitioning) { int size = between(1_000, 20_000); int limit = between(10, size); try { - testSimple(crankyDriverContext(), size, limit); + testSimple(crankyDriverContext(), partitioning, size, limit); logger.info("cranky didn't break"); } catch (CircuitBreakingException e) { logger.info("broken", e); @@ -203,8 +226,8 @@ public void testShardDataPartitioningWithCranky() { } } - private void testSimple(DriverContext ctx, int size, int limit) { - LuceneSourceOperator.Factory factory = simple(DataPartitioning.SHARD, size, limit, scoring); + private void testSimple(DriverContext ctx, DataPartitioning partitioning, int size, int limit) { + LuceneSourceOperator.Factory factory = simple(partitioning, size, limit, scoring); Operator.OperatorFactory readS = ValuesSourceReaderOperatorTests.factory(reader, S_FIELD, ElementType.LONG); List results = new ArrayList<>(); From c72af75cbf047f2cf5daf0a5c7a37376ebc57e8d Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 27 Mar 2025 10:50:29 -0400 Subject: [PATCH 04/13] Report partitioning strategies --- .../org/elasticsearch/TransportVersions.java | 1 + .../compute/lucene/LuceneOperator.java | 18 ++++- .../compute/lucene/LuceneSliceQueue.java | 66 +++++++++++++++++-- .../planner/EsPhysicalOperationProviders.java | 5 +- 4 files changed, 81 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index aa1c87e875aec..3e7504017ab14 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -204,6 +204,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00); public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00); public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0); + public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_039_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 2fd133cf496a3..2019e1eb59bc8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -37,12 +37,17 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING; + public abstract class LuceneOperator extends SourceOperator { private static final Logger logger = LogManager.getLogger(LuceneOperator.class); @@ -270,6 +275,7 @@ public static class Status implements Operator.Status { private final int sliceMax; private final int current; private final long rowsEmitted; + private final Map partitioningStrategies; private Status(LuceneOperator operator) { processedSlices = operator.processedSlices; @@ -295,6 +301,7 @@ private Status(LuceneOperator operator) { } pagesEmitted = operator.pagesEmitted; rowsEmitted = operator.rowsEmitted; + partitioningStrategies = operator.sliceQueue.partitioningStrategies(); } Status( @@ -308,7 +315,8 @@ private Status(LuceneOperator operator) { int sliceMin, int sliceMax, int current, - long rowsEmitted + long rowsEmitted, + SortedMap partitioningStrategies ) { this.processedSlices = processedSlices; this.processedQueries = processedQueries; @@ -321,6 +329,7 @@ private Status(LuceneOperator operator) { this.sliceMax = sliceMax; this.current = current; this.rowsEmitted = rowsEmitted; + this.partitioningStrategies = partitioningStrategies; } Status(StreamInput in) throws IOException { @@ -344,6 +353,9 @@ private Status(LuceneOperator operator) { } else { rowsEmitted = 0; } + partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING) + ? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom) + : Map.of(); } @Override @@ -365,6 +377,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { out.writeVLong(rowsEmitted); } + if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) { + out.writeMapValues(partitioningStrategies); + } } @Override @@ -433,6 +448,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("slice_max", sliceMax); builder.field("current", current); builder.field("rows_emitted", rowsEmitted); + builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies)); return builder.endObject(); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index bc4fc0f02ce43..ff08d9a97b1f1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -13,6 +13,9 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; import java.io.IOException; @@ -20,8 +23,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; @@ -34,10 +41,12 @@ public final class LuceneSliceQueue { private final int totalSlices; private final Queue slices; + private final Map partitioningStrategies; - private LuceneSliceQueue(List slices) { + private LuceneSliceQueue(List slices, Map partitioningStrategies) { this.totalSlices = slices.size(); this.slices = new ConcurrentLinkedQueue<>(slices); + this.partitioningStrategies = partitioningStrategies; } @Nullable @@ -49,6 +58,13 @@ public int totalSlices() { return totalSlices; } + /** + * Strategy used to partition each shard in this queue. + */ + public Map partitioningStrategies() { + return partitioningStrategies; + } + public Collection remainingShardsIdentifiers() { return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList(); } @@ -61,7 +77,8 @@ public static LuceneSliceQueue create( int taskConcurrency, ScoreMode scoreMode ) { - final List slices = new ArrayList<>(); + List slices = new ArrayList<>(); + Map partitioningStrategies = new HashMap<>(contexts.size()); for (ShardContext ctx : contexts) { Query query = queryFunction.apply(ctx); /* @@ -74,6 +91,7 @@ public static LuceneSliceQueue create( throw new UncheckedIOException(e); } PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query); + partitioningStrategies.put(ctx.shardIdentifier(), partitioning); List> groups = partitioning.groups(ctx.searcher(), taskConcurrency); Weight weight = weight(ctx, query, scoreMode); for (List group : groups) { @@ -82,17 +100,27 @@ public static LuceneSliceQueue create( } } } - return new LuceneSliceQueue(slices); + return new LuceneSliceQueue(slices, partitioningStrategies); } - public enum PartitioningStrategy { - SHARD { + /** + * Strategy used to partition each shard into slices. See {@link DataPartitioning} + * for descriptions on how each value works. + */ + public enum PartitioningStrategy implements Writeable { + /** + * See {@link DataPartitioning#SHARD}. + */ + SHARD(0) { @Override List> groups(IndexSearcher searcher, int requestedNumSlices) { return List.of(searcher.getLeafContexts().stream().map(PartialLeafReaderContext::new).toList()); } }, - SEGMENT { + /** + * See {@link DataPartitioning#SEGMENT}. + */ + SEGMENT(1) { @Override List> groups(IndexSearcher searcher, int requestedNumSlices) { IndexSearcher.LeafSlice[] gs = IndexSearcher.slices( @@ -104,7 +132,10 @@ List> groups(IndexSearcher searcher, int requeste return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList(); } }, - DOC { + /** + * See {@link DataPartitioning#DOC}. + */ + DOC(2) { @Override List> groups(IndexSearcher searcher, int requestedNumSlices) { final int totalDocCount = searcher.getIndexReader().maxDoc(); @@ -155,6 +186,27 @@ List> groups(IndexSearcher searcher, int requeste } }; + private final byte id; + + PartitioningStrategy(int id) { + this.id = (byte) id; + } + + public static PartitioningStrategy readFrom(StreamInput in) throws IOException { + int id = in.readByte(); + return switch (id) { + case 0 -> SHARD; + case 1 -> SEGMENT; + case 2 -> DOC; + default -> throw new IllegalArgumentException("invalid PartitioningStrategyId [" + id + "]"); + }; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(id); + } + abstract List> groups(IndexSearcher searcher, int requestedNumSlices); private static PartitioningStrategy pick( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index b2d627f4090b3..03dd37aad428a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -278,11 +278,14 @@ public static class DefaultShardContext implements ShardContext { private final int index; private final SearchExecutionContext ctx; private final AliasFilter aliasFilter; + private final String shardIdentifier; public DefaultShardContext(int index, SearchExecutionContext ctx, AliasFilter aliasFilter) { this.index = index; this.ctx = ctx; this.aliasFilter = aliasFilter; + // Build the shardIdentifier once up front so we can reuse references to it in many places. + this.shardIdentifier = ctx.getFullyQualifiedIndex().getName() + ":" + ctx.getShardId(); } @Override @@ -302,7 +305,7 @@ public Optional buildSort(List> sorts) throws IOE @Override public String shardIdentifier() { - return ctx.getFullyQualifiedIndex().getName() + ":" + ctx.getShardId(); + return shardIdentifier; } @Override From e2e817d900bf47a10aeaf6da4dfc0dd168b7a0ad Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 27 Mar 2025 11:40:25 -0400 Subject: [PATCH 05/13] Test report --- .../compute/lucene/LuceneOperator.java | 24 ++++++++++--- .../compute/lucene/LuceneSliceQueue.java | 2 -- .../LuceneSourceOperatorStatusTests.java | 36 ++++++++++++++++--- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 2019e1eb59bc8..9fb1692328e9f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -40,7 +40,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Function; @@ -316,7 +315,7 @@ private Status(LuceneOperator operator) { int sliceMax, int current, long rowsEmitted, - SortedMap partitioningStrategies + Map partitioningStrategies ) { this.processedSlices = processedSlices; this.processedQueries = processedQueries; @@ -378,7 +377,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(rowsEmitted); } if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) { - out.writeMapValues(partitioningStrategies); + out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable); } } @@ -431,6 +430,10 @@ public long rowsEmitted() { return rowsEmitted; } + public Map partitioningStrategies() { + return partitioningStrategies; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -467,12 +470,23 @@ public boolean equals(Object o) { && sliceMin == status.sliceMin && sliceMax == status.sliceMax && current == status.current - && rowsEmitted == status.rowsEmitted; + && rowsEmitted == status.rowsEmitted + && partitioningStrategies.equals(status.partitioningStrategies); } @Override public int hashCode() { - return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted); + return Objects.hash( + processedSlices, + sliceIndex, + totalSlices, + pagesEmitted, + sliceMin, + sliceMax, + current, + rowsEmitted, + partitioningStrategies + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index ff08d9a97b1f1..c5a674a367d45 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -27,8 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Function; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java index f45a2c645e9d3..3c711fc45147e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java @@ -12,7 +12,9 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.test.ESTestCase; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -31,7 +33,8 @@ public static LuceneSourceOperator.Status simple() { 123, 99990, 8000, - 222 + 222, + Map.of("b:0", LuceneSliceQueue.PartitioningStrategy.SHARD, "a:1", LuceneSliceQueue.PartitioningStrategy.DOC) ); } @@ -54,7 +57,11 @@ public static String simpleToJson() { "slice_min" : 123, "slice_max" : 99990, "current" : 8000, - "rows_emitted" : 222 + "rows_emitted" : 222, + "partitioning_strategies" : { + "a:1" : "DOC", + "b:0" : "SHARD" + } }"""; } @@ -80,7 +87,8 @@ public LuceneSourceOperator.Status createTestInstance() { randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt(), - randomNonNegativeLong() + randomNonNegativeLong(), + randomPartitioningStrategies() ); } @@ -102,6 +110,18 @@ private static Set randomProcessedShards() { return set; } + private static Map randomPartitioningStrategies() { + int size = between(0, 10); + Map partitioningStrategies = new HashMap<>(); + while (partitioningStrategies.size() < size) { + partitioningStrategies.put( + randomAlphaOfLength(3) + ":" + between(0, 10), + randomFrom(LuceneSliceQueue.PartitioningStrategy.values()) + ); + } + return partitioningStrategies; + } + @Override protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status instance) { int processedSlices = instance.processedSlices(); @@ -115,7 +135,8 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status int sliceMax = instance.sliceMax(); int current = instance.current(); long rowsEmitted = instance.rowsEmitted(); - switch (between(0, 10)) { + Map partitioningStrategies = instance.partitioningStrategies(); + switch (between(0, 11)) { case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt); case 1 -> processedQueries = randomValueOtherThan(processedQueries, LuceneSourceOperatorStatusTests::randomProcessedQueries); case 2 -> processedShards = randomValueOtherThan(processedShards, LuceneSourceOperatorStatusTests::randomProcessedShards); @@ -127,6 +148,10 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt); case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt); case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 11 -> partitioningStrategies = randomValueOtherThan( + partitioningStrategies, + LuceneSourceOperatorStatusTests::randomPartitioningStrategies + ); default -> throw new UnsupportedOperationException(); } return new LuceneSourceOperator.Status( @@ -140,7 +165,8 @@ protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status sliceMin, sliceMax, current, - rowsEmitted + rowsEmitted, + partitioningStrategies ); } } From a348ef9132b41f62284dea6ab8d874dc10292314 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 27 Mar 2025 14:26:54 -0400 Subject: [PATCH 06/13] test --- .../qa/single_node/EsqlPartitioningIT.java | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java new file mode 100644 index 0000000000000..1f5d0dc8b6add --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentUtils; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class EsqlPartitioningIT extends ESRestTestCase { + /** + * The size of the large index we use to test partitioning. It has to be + * at least 250,000 entries for AUTO partitioning to do interesting things. + */ + private static final int IDX_DOCS = 300_000; + /** + * The size of the small index we use to test partitioning. It's small enough + * that AUTO should always choose SHARD partitioning. + */ + private static final int SMALL_IDX_DOCS = 20_000; + + private static final long BASE_TIME = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z"); + private static final int BULK_CHARS = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); + + record Case(String suffix, String idxPartition) {} + + @ParametersFactory(argumentFormatting = "%2$s -> %3$s") + public static Iterable parameters() { + List params = new ArrayList<>(); + for (String index: new String[] {"idx", "small_idx"}) { + for (Case c: new Case[] { + new Case("", "SHARD"), + new Case("| SORT @timestamp ASC", "SHARD"), + new Case("| WHERE ABS(a) == 1", "DOC"), + new Case("| WHERE a == 1", "SHARD"), + new Case("| STATS SUM(a)", "DOC"), + new Case("| MV_EXPAND a | STATS SUM(a)", "DOC"), + new Case("| WHERE a == 1 | STATS SUM(a)", "SEGMENT"), + new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), + }) { + params.add(new Object[]{ index, "FROM " + index + " " + c.suffix, expectedPartition(index, c.idxPartition)}); + } + } + return params; + } + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(); + + private final String index; + private final String query; + private final Matcher expectedPartition; + + public EsqlPartitioningIT(String index, String query, Matcher expectedPartition) { + this.index = index; + this.query = query; + this.expectedPartition = expectedPartition; + } + + public void test() throws IOException { + setupIndex(index, switch (index) { + case "idx" -> IDX_DOCS; + case "small_idx" -> SMALL_IDX_DOCS; + default -> throw new IllegalArgumentException("unknown index [" + index + "]"); + }); + assertThat(partitionForQuery(query), expectedPartition); + } + + private static Matcher expectedPartition(String index, String idxPartition) { + return equalTo(switch (index) { + case "idx" -> idxPartition; + case "small_idx" -> "SHARD"; + default -> throw new UnsupportedOperationException("unknown index [" + index + "]"); + }); + } + + private String partitionForQuery(String query) throws IOException { + Request request = new Request("POST", "_query"); + request.addParameter("pretty", ""); + XContentBuilder b = JsonXContent.contentBuilder().startObject(); + b.field("query", query); + b.field("profile", true); + request.setJsonEntity(Strings.toString(b.endObject())); + request.setOptions(request.getOptions().toBuilder().setWarningsHandler(w -> { + w.remove("No limit defined, adding default limit of [1000]"); + return w.isEmpty() == false; + })); + String response = EntityUtils.toString(client().performRequest(request).getEntity()); + logger.info("Response: {}", response); + Map profile = (Map) XContentHelper.convertToMap(JsonXContent.jsonXContent, response, true).get("profile"); + List drivers = (List) profile.get("drivers"); + for (Object dItem : drivers) { + Map d = (Map) dItem; + if (false == "data".equals(d.get("description"))) { + continue; + } + List operators = (List) d.get("operators"); + for (Object oItem : operators) { + Map o = (Map) oItem; + String operator = o.get("operator").toString(); + if (false == operator.startsWith("Lucene")) { + continue; + } + Map status = (Map) o.get("status"); + Map partitioningStrategies = (Map) status.get("partitioning_strategies"); + String strat = (String) partitioningStrategies.get(index + ":0"); + if (strat == null) { + throw new IllegalArgumentException("didn't find partition strategy for: " + o); + } + return strat; + } + } + throw new IllegalArgumentException("didn't find partition strategy for: " + drivers); + } + + private void setupIndex(String index, int docs) throws IOException { + Response exists = client().performRequest(new Request("HEAD", "/" + index)); + if (exists.getStatusLine().getStatusCode() == 200) { + return; + } + StringBuilder bulk = new StringBuilder(); + for (int d = 0; d < docs; d++) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"@timestamp\": \""); + bulk.append(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(BASE_TIME + d)); + bulk.append("\", \"a\": "); + bulk.append(d % 10); + bulk.append("}\n"); + if (bulk.length() > BULK_CHARS) { + logger.info("indexing {}: {}", index, d); + bulk(index, bulk.toString()); + bulk.setLength(0); + } + } + logger.info("indexing {}: {}", index, docs); + bulk(index, bulk.toString()); + client().performRequest(new Request("POST", "/" + index + "/_refresh")); + } + + private void bulk(String index, String bulk) throws IOException { + Request request = new Request("POST", index + "/_bulk"); + request.setJsonEntity(bulk); + Response response = client().performRequest(request); + if (response.getStatusLine().getStatusCode() != 200) { + throw new AssertionError("error with bulk: " + EntityUtils.toString(response.getEntity())); + } + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } +} From d201b4a79f3d9eccc193fe868f84ca75c203381c Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 27 Mar 2025 18:39:48 +0000 Subject: [PATCH 07/13] [CI] Auto commit changes from spotless --- .../xpack/esql/qa/single_node/EsqlPartitioningIT.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index 1f5d0dc8b6add..a2de43fac3d08 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -21,10 +21,8 @@ import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentUtils; import org.elasticsearch.xcontent.json.JsonXContent; import org.hamcrest.Matcher; -import org.junit.Before; import org.junit.ClassRule; import java.io.IOException; @@ -55,8 +53,8 @@ record Case(String suffix, String idxPartition) {} @ParametersFactory(argumentFormatting = "%2$s -> %3$s") public static Iterable parameters() { List params = new ArrayList<>(); - for (String index: new String[] {"idx", "small_idx"}) { - for (Case c: new Case[] { + for (String index : new String[] { "idx", "small_idx" }) { + for (Case c : new Case[] { new Case("", "SHARD"), new Case("| SORT @timestamp ASC", "SHARD"), new Case("| WHERE ABS(a) == 1", "DOC"), @@ -64,9 +62,8 @@ public static Iterable parameters() { new Case("| STATS SUM(a)", "DOC"), new Case("| MV_EXPAND a | STATS SUM(a)", "DOC"), new Case("| WHERE a == 1 | STATS SUM(a)", "SEGMENT"), - new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), - }) { - params.add(new Object[]{ index, "FROM " + index + " " + c.suffix, expectedPartition(index, c.idxPartition)}); + new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), }) { + params.add(new Object[] { index, "FROM " + index + " " + c.suffix, expectedPartition(index, c.idxPartition) }); } } return params; From bc921479e32c11cdbeb8c3f124549bced6dfde4d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 1 Apr 2025 12:31:54 +0000 Subject: [PATCH 08/13] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java | 2 -- .../java/org/elasticsearch/compute/lucene/LuceneMinFactory.java | 2 -- .../compute/lucene/TimeSeriesSortedSourceOperatorFactory.java | 2 -- 3 files changed, 6 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java index 9d317b9b93b1b..5e91f2b80bcec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java index 5c2800e5925a1..fc457ae196186 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index 2b314cce41b6d..e6e2c041113b2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -33,8 +33,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index. *

From 05175e5ee04b2a334509d4e4e52d413f0c591358 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 1 Apr 2025 11:09:52 -0400 Subject: [PATCH 09/13] Cluster setting --- .../compute/lucene/DataPartitioning.java | 2 +- .../compute/lucene/LuceneMaxFactory.java | 2 - .../compute/lucene/LuceneMinFactory.java | 2 - .../compute/lucene/LuceneSliceQueue.java | 9 +- .../compute/lucene/LuceneSourceOperator.java | 114 ++++++++++++------ ...TimeSeriesSortedSourceOperatorFactory.java | 2 - ...LuceneSourceOperatorAutoStrategyTests.java | 31 +++++ .../lucene/LuceneSourceOperatorTests.java | 3 +- .../qa/single_node/EsqlPartitioningIT.java | 67 +++++++--- .../planner/EsPhysicalOperationProviders.java | 16 ++- .../xpack/esql/plugin/ComputeService.java | 11 +- .../xpack/esql/plugin/EsqlPlugin.java | 12 +- .../xpack/esql/plugin/QueryPragmas.java | 23 ++-- .../optimizer/PhysicalPlanOptimizerTests.java | 3 +- .../planner/LocalExecutionPlannerTests.java | 3 +- 15 files changed, 222 insertions(+), 78 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java index f0acff78ba8a8..2529b060a9998 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java @@ -11,7 +11,7 @@ /** * How we partition the data across {@link Driver}s. Each request forks into - * {@code min(cpus, partition_count)} threads on the data node. More partitions + * {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions * allow us to bring more threads to bear on CPU intensive data node side tasks. */ public enum DataPartitioning { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java index 9d317b9b93b1b..5e91f2b80bcec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java index 5c2800e5925a1..fc457ae196186 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index c5a674a367d45..9a8789f194a44 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -80,8 +80,11 @@ public static LuceneSliceQueue create( for (ShardContext ctx : contexts) { Query query = queryFunction.apply(ctx); /* - * Rewrite the query on the local segment so things like fully - * overlapping range queries become match all. + * Rewrite the query on the local index so things like fully + * overlapping range queries become match all. It's important + * to do this before picking the partitioning strategy so we + * can pick more aggressive strategies when the query rewrites + * into MatchAll. */ try { query = query.rewrite(ctx.searcher()); @@ -239,7 +242,7 @@ static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) { var searcher = ctx.searcher(); try { Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); - return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1); + return searcher.createWeight(actualQuery, scoreMode, 1); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 604df27ba8242..51b842d3f0ddc 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -23,22 +23,30 @@ import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Limiter; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import static org.apache.lucene.search.ScoreMode.COMPLETE; import static org.apache.lucene.search.ScoreMode.COMPLETE_NO_SCORES; +import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.DOC; +import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.SEGMENT; +import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.SHARD; /** * Source operator that incrementally runs Lucene searches */ public class LuceneSourceOperator extends LuceneOperator { + private static final Logger log = LogManager.getLogger(LuceneSourceOperator.class); private int currentPagePos = 0; private int remainingDocs; @@ -103,71 +111,105 @@ public String describe() { /** * Pick a strategy for the {@link DataPartitioning#AUTO} partitioning. */ - public static Function autoStrategy(int limit) { + public static Function autoStrategy(int limit) { return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; } /** - * Use the {@link LuceneSliceQueue.PartitioningStrategy#SHARD} strategy because + * Use the {@link PartitioningStrategy#SHARD} strategy because * it has the lowest overhead. Used when there is a {@code limit} on the operator * because that's for cases like {@code FROM foo | LIMIT 10} or * {@code FROM foo | WHERE a == 1 | LIMIT 10} when the {@code WHERE} can be pushed * to Lucene. In those cases we're better off with the lowest overhead we can - * manage - and that's {@link LuceneSliceQueue.PartitioningStrategy#SHARD}. + * manage - and that's {@link PartitioningStrategy#SHARD}. */ - private static LuceneSliceQueue.PartitioningStrategy lowOverheadAutoStrategy(Query query) { - return LuceneSliceQueue.PartitioningStrategy.SHARD; + private static PartitioningStrategy lowOverheadAutoStrategy(Query query) { + return SHARD; } /** - * Select the {@link LuceneSliceQueue.PartitioningStrategy} based on the {@link Query}. + * Select the {@link PartitioningStrategy} based on the {@link Query}. *

    *
  • * If the {@linkplain Query} matches no documents then this will - * use the {@link LuceneSliceQueue.PartitioningStrategy#SEGMENT} strategy so we - * minimize the overhead of finding nothing. + * use the {@link PartitioningStrategy#SHARD} strategy so we minimize the overhead + * of finding nothing. *
  • *
  • * If the {@linkplain Query} matches all documents then this will - * use the {@link LuceneSliceQueue.PartitioningStrategy#DOC} strategy because the - * overhead of using that strategy for {@link MatchAllDocsQuery} is very low, and - * we need as many CPUs as we can get to process all the documents. + * use the {@link PartitioningStrategy#DOC} strategy because the overhead of using + * that strategy for {@link MatchAllDocsQuery} is very low, and we need as many CPUs + * as we can get to process all the documents. *
  • *
  • - * Otherwise use the {@link LuceneSliceQueue.PartitioningStrategy#SEGMENT} strategy - * because it's overhead is generally low. + * Otherwise use the {@link PartitioningStrategy#SEGMENT} strategy because it's + * overhead is generally low. *
  • *
*/ - private static LuceneSliceQueue.PartitioningStrategy highSpeedAutoStrategy(Query query) { - if (query instanceof ConstantScoreQuery c) { - return highSpeedAutoStrategy(c.getQuery()); - } - if (query instanceof BoostQuery b) { - return highSpeedAutoStrategy(b.getQuery()); - } - if (query instanceof BooleanQuery bq) { - for (BooleanClause c : bq) { - if (c.isRequired() == false) { - return LuceneSliceQueue.PartitioningStrategy.SEGMENT; - } - LuceneSliceQueue.PartitioningStrategy forClause = highSpeedAutoStrategy(c.query()); - if (forClause == LuceneSliceQueue.PartitioningStrategy.SHARD) { - return LuceneSliceQueue.PartitioningStrategy.SHARD; + private static PartitioningStrategy highSpeedAutoStrategy(Query query) { + Query unwrapped = unwrap(query); + log.trace("highSpeedAutoStrategy {} {}", query, unwrapped); + return switch (unwrapped) { + case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q); + case MatchAllDocsQuery q -> DOC; + case MatchNoDocsQuery q -> SHARD; + default -> SEGMENT; + }; + } + + private static Query unwrap(Query query) { + while (true) { + switch (query) { + case BoostQuery q: { + query = q.getQuery(); + break; } - if (forClause != LuceneSliceQueue.PartitioningStrategy.DOC) { - return LuceneSliceQueue.PartitioningStrategy.SEGMENT; + case ConstantScoreQuery q: { + query = q.getQuery(); + break; } + default: + return query; + } + } + } + + /** + * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}. + *
    + *
  • + * If the query can't match anything, returns {@link PartitioningStrategy#SEGMENT}. + *
  • + * + *
+ */ + private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) { + List clauses = new ArrayList<>(query.clauses().size()); + boolean allRequired = true; + for (BooleanClause c : query) { + Query clauseQuery = unwrap(c.query()); + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery); + if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery) + || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) { + // Can't match anything + return SHARD; } - return LuceneSliceQueue.PartitioningStrategy.DOC; + allRequired &= c.isRequired(); + clauses.add(highSpeedAutoStrategy(clauseQuery)); + } + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses); + if (allRequired == false) { + return SEGMENT; } - if (query instanceof MatchAllDocsQuery) { - return LuceneSliceQueue.PartitioningStrategy.DOC; + if (clauses.stream().anyMatch(s -> s == SHARD)) { + return SHARD; } - if (query instanceof MatchNoDocsQuery) { - return LuceneSliceQueue.PartitioningStrategy.SHARD; + if (clauses.stream().anyMatch(s -> s == SEGMENT)) { + return SEGMENT; } - return LuceneSliceQueue.PartitioningStrategy.SEGMENT; + assert clauses.stream().allMatch(s -> s == DOC); + return DOC; } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java index 2b314cce41b6d..e6e2c041113b2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java @@ -33,8 +33,6 @@ import java.util.List; import java.util.function.Function; -import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction; - /** * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index. *

diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java index bfd161594b16d..a1f83e51d7bbe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BoostQuery; @@ -19,6 +20,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; import org.elasticsearch.test.ESTestCase; import java.util.List; @@ -26,6 +28,10 @@ import static org.hamcrest.Matchers.equalTo; +/** + * Tests for the {@link LuceneSourceOperator.Factory#autoStrategy} method. It picks + * the strategy based on complex rules around the query. + */ public class LuceneSourceOperatorAutoStrategyTests extends ESTestCase { @ParametersFactory(argumentFormatting = "%s -> %s") public static Iterable parameters() { @@ -70,6 +76,31 @@ public static Iterable parameters() { ), BooleanClause.Occur.MUST ).add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST).build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT }, + + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST) + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT }, + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST_NOT) + .build(), + LuceneSliceQueue.PartitioningStrategy.SHARD }, + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new MatchAllDocsQuery(), BooleanClause.Occur.SHOULD) + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .build(), + LuceneSliceQueue.PartitioningStrategy.SEGMENT }, + new Object[] { + new BooleanQuery.Builder() // formatter + .add(new MatchNoDocsQuery(), BooleanClause.Occur.SHOULD) + .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD) + .build(), LuceneSliceQueue.PartitioningStrategy.SEGMENT } ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 16725920810fe..a079d0519f110 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -123,8 +123,7 @@ protected Matcher expectedDescriptionOfSimple() { + "dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), " + "maxPageSize = \\d+, " + "limit = 100, " - + "needsScore = (true|false)" - + "scoreMode = (COMPLETE|COMPLETE_NO_SCORES)]" + + "needsScore = (true|false)]" ); } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index a2de43fac3d08..209cf709a50b4 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -28,9 +28,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class EsqlPartitioningIT extends ESRestTestCase { @@ -50,20 +52,28 @@ public class EsqlPartitioningIT extends ESRestTestCase { record Case(String suffix, String idxPartition) {} - @ParametersFactory(argumentFormatting = "%2$s -> %3$s") + @ParametersFactory(argumentFormatting = "[%1$s] %3$s -> %4$s") public static Iterable parameters() { List params = new ArrayList<>(); - for (String index : new String[] { "idx", "small_idx" }) { - for (Case c : new Case[] { - new Case("", "SHARD"), - new Case("| SORT @timestamp ASC", "SHARD"), - new Case("| WHERE ABS(a) == 1", "DOC"), - new Case("| WHERE a == 1", "SHARD"), - new Case("| STATS SUM(a)", "DOC"), - new Case("| MV_EXPAND a | STATS SUM(a)", "DOC"), - new Case("| WHERE a == 1 | STATS SUM(a)", "SEGMENT"), - new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), }) { - params.add(new Object[] { index, "FROM " + index + " " + c.suffix, expectedPartition(index, c.idxPartition) }); + for (String defaultDataPartitioning : new String[] { null, "shard", "segment", "doc", "auto" }) { + for (String index : new String[] { "idx", "small_idx" }) { + for (Case c : new Case[] { + new Case("", "SHARD"), + new Case("| SORT @timestamp ASC", "SHARD"), + new Case("| WHERE ABS(a) == 1", "DOC"), + new Case("| WHERE a == 1", "SHARD"), + new Case("| STATS SUM(a)", "DOC"), + new Case("| MV_EXPAND a | STATS SUM(a)", "DOC"), + new Case("| WHERE a == 1 | STATS SUM(a)", "SEGMENT"), + new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), }) { + params.add( + new Object[] { + defaultDataPartitioning, + index, + "FROM " + index + " " + c.suffix, + expectedPartition(defaultDataPartitioning, index, c.idxPartition) } + ); + } } } return params; @@ -72,11 +82,13 @@ public static Iterable parameters() { @ClassRule public static ElasticsearchCluster cluster = Clusters.testCluster(); + private final String defaultDataPartitioning; private final String index; private final String query; private final Matcher expectedPartition; - public EsqlPartitioningIT(String index, String query, Matcher expectedPartition) { + public EsqlPartitioningIT(String defaultDataPartitioning, String index, String query, Matcher expectedPartition) { + this.defaultDataPartitioning = defaultDataPartitioning; this.index = index; this.query = query; this.expectedPartition = expectedPartition; @@ -88,10 +100,34 @@ public void test() throws IOException { case "small_idx" -> SMALL_IDX_DOCS; default -> throw new IllegalArgumentException("unknown index [" + index + "]"); }); - assertThat(partitionForQuery(query), expectedPartition); + setDefaultDataPartitioning(defaultDataPartitioning); + try { + assertThat(partitionForQuery(query), expectedPartition); + } finally { + setDefaultDataPartitioning(null); + } + } + + private void setDefaultDataPartitioning(String defaultDataPartitioning) throws IOException { + Request request = new Request("PUT", "/_cluster/settings"); + XContentBuilder builder = JsonXContent.contentBuilder().startObject(); + builder.startObject("transient"); + builder.field("esql.default_data_partitioning", defaultDataPartitioning); + builder.endObject(); + request.setJsonEntity(Strings.toString(builder.endObject())); + int code = client().performRequest(request).getStatusLine().getStatusCode(); + assertThat(code, equalTo(200)); + } + + private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition) { + return switch (defaultDataPartitioning) { + case null -> expectedAutoPartition(index, idxPartition); + case "auto" -> expectedAutoPartition(index, idxPartition); + default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT)); + }; } - private static Matcher expectedPartition(String index, String idxPartition) { + private static Matcher expectedAutoPartition(String index, String idxPartition) { return equalTo(switch (index) { case "idx" -> idxPartition; case "small_idx" -> "SHARD"; @@ -102,6 +138,7 @@ private static Matcher expectedPartition(String index, String idxPartiti private String partitionForQuery(String query) throws IOException { Request request = new Request("POST", "_query"); request.addParameter("pretty", ""); + request.addParameter("error_trace", ""); XContentBuilder b = JsonXContent.contentBuilder().startObject(); b.field("query", query); b.field("profile", true); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 03dd37aad428a..a93e2f16c075b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -17,6 +17,7 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperator; @@ -101,10 +102,17 @@ public interface ShardContext extends org.elasticsearch.compute.lucene.ShardCont } private final List shardContexts; + private final DataPartitioning defaultDataPartitioning; - public EsPhysicalOperationProviders(FoldContext foldContext, List shardContexts, AnalysisRegistry analysisRegistry) { + public EsPhysicalOperationProviders( + FoldContext foldContext, + List shardContexts, + AnalysisRegistry analysisRegistry, + DataPartitioning defaultDataPartitioning + ) { super(foldContext, analysisRegistry); this.shardContexts = shardContexts; + this.defaultDataPartitioning = defaultDataPartitioning; } @Override @@ -199,7 +207,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -219,7 +227,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, luceneFactory = new LuceneSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, @@ -241,7 +249,7 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont return new LuceneCountOperator.Factory( shardContexts, querySupplier(queryBuilder), - context.queryPragmas().dataPartitioning(), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), context.queryPragmas().taskConcurrency(), limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx()) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 293be0eb3c2b0..67804c71255f7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverTaskRunner; @@ -128,6 +129,8 @@ public class ComputeService { private final ClusterComputeHandler clusterComputeHandler; private final ExchangeService exchangeService; + private volatile DataPartitioning defaultDataPartitioning; + @SuppressWarnings("this-escape") public ComputeService( SearchService searchService, @@ -158,6 +161,7 @@ public ComputeService( dataNodeComputeHandler ); this.exchangeService = exchangeService; + clusterService.getClusterSettings().initializeAndWatch(EsqlPlugin.DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); } public void execute( @@ -410,7 +414,12 @@ public SourceProvider createSourceProvider() { context.exchangeSinkSupplier(), enrichLookupService, lookupFromIndexService, - new EsPhysicalOperationProviders(context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis()), + new EsPhysicalOperationProviders( + context.foldCtx(), + contexts, + searchService.getIndicesService().getAnalysis(), + defaultDataPartitioning + ), contexts ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 56773005edd5c..78d6411dc0630 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockFactoryProvider; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; @@ -153,6 +154,14 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { Setting.Property.Dynamic ); + public static final Setting DEFAULT_DATA_PARTITIONING = Setting.enumSetting( + DataPartitioning.class, + "esql.default_data_partitioning", + DataPartitioning.AUTO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + @Override public Collection createComponents(PluginServices services) { CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request"); @@ -215,7 +224,8 @@ public List> getSettings() { ESQL_QUERYLOG_THRESHOLD_DEBUG_SETTING, ESQL_QUERYLOG_THRESHOLD_INFO_SETTING, ESQL_QUERYLOG_THRESHOLD_WARN_SETTING, - ESQL_QUERYLOG_INCLUDE_USER_SETTING + ESQL_QUERYLOG_INCLUDE_USER_SETTING, + DEFAULT_DATA_PARTITIONING ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index f0e82bfdbc729..b8aa1a7badcab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.core.TimeValue; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import java.io.IOException; +import java.util.Locale; import java.util.Objects; /** @@ -38,11 +40,14 @@ public final class QueryPragmas implements Writeable { ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY)) ); - public static final Setting DATA_PARTITIONING = Setting.enumSetting( - DataPartitioning.class, - "data_partitioning", - DataPartitioning.AUTO - ); + /** + * How to cut {@link LuceneSliceQueue slices} to cut each shard into. Is parsed to + * the enum {@link DataPartitioning} which has more documentation. Not an + * {@link Setting#enumSetting} because those can't have {@code null} defaults. + * {@code null} here means "use the default from the cluster setting + * named {@link EsqlPlugin#DEFAULT_DATA_PARTITIONING}." + */ + public static final Setting DATA_PARTITIONING = Setting.simpleString("data_partitioning"); /** * Size of a page in entries with {@code 0} being a special value asking @@ -100,8 +105,12 @@ public int concurrentExchangeClients() { return EXCHANGE_CONCURRENT_CLIENTS.get(settings); } - public DataPartitioning dataPartitioning() { - return DATA_PARTITIONING.get(settings); + public DataPartitioning dataPartitioning(DataPartitioning defaultDataPartitioning) { + String partitioning = DATA_PARTITIONING.get(settings); + if (partitioning.isEmpty()) { + return defaultDataPartitioning; + } + return DataPartitioning.valueOf(partitioning.toUpperCase(Locale.ROOT)); } public int taskConcurrency() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 2af859bfabc31..a7b1639993598 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.compute.test.TestBlockFactory; @@ -7673,7 +7674,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP () -> exchangeSinkHandler.createExchangeSink(() -> {}), null, null, - new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null), + new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO), List.of() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 6a7571d6964c9..90348cdbb62dd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -255,7 +256,7 @@ private Configuration config() { } private EsPhysicalOperationProviders esPhysicalOperationProviders(List shardContexts) { - return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null); + return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO); } private List createShardContexts() throws IOException { From 03f029c2c811e71c114ef3b67fa07ccdfacc75cd Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 1 Apr 2025 11:54:37 -0400 Subject: [PATCH 10/13] in --- .../xpack/esql/qa/single_node/EsqlPartitioningIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index 209cf709a50b4..5726fc866428f 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -32,7 +32,6 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.in; @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class EsqlPartitioningIT extends ESRestTestCase { From 5ab70323b706487f2cd4c0437bf68478c4bd03af Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 1 Apr 2025 13:51:02 -0400 Subject: [PATCH 11/13] Fix rewrite --- .../org/elasticsearch/compute/lucene/LuceneSliceQueue.java | 3 ++- .../elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 9a8789f194a44..4a8847e2870aa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -79,6 +79,7 @@ public static LuceneSliceQueue create( Map partitioningStrategies = new HashMap<>(contexts.size()); for (ShardContext ctx : contexts) { Query query = queryFunction.apply(ctx); + query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query); /* * Rewrite the query on the local index so things like fully * overlapping range queries become match all. It's important @@ -87,7 +88,7 @@ public static LuceneSliceQueue create( * into MatchAll. */ try { - query = query.rewrite(ctx.searcher()); + query = ctx.searcher().rewrite(query); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 1b44536eed508..814cd8bfd9796 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -687,7 +687,8 @@ private String checkOperatorProfile(Map o) { .entry("pages_emitted", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) .entry("process_nanos", greaterThan(0)) - .entry("processed_queries", List.of("*:*")); + .entry("processed_queries", List.of("*:*")) + .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("rows_received", greaterThan(0)) From c81693461fb554540e892cc383cdacd35d3ac471 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 9 Apr 2025 14:04:27 -0400 Subject: [PATCH 12/13] Fixup merge --- server/src/main/java/org/elasticsearch/TransportVersions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0eda2f9661ebe..3581eb7331697 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -215,7 +215,7 @@ static TransportVersion def(int id) { public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00); public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0); public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_048_00_0); - public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0); + public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_049_00_0); /* * STOP! READ THIS FIRST! No, really, From ad9e064116c6ccde54260da75460f5c5b8e5e01b Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 9 Apr 2025 15:34:47 -0400 Subject: [PATCH 13/13] Try and make this more consistent --- .../xpack/esql/qa/single_node/EsqlPartitioningIT.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index 5726fc866428f..238f5aacce588 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -179,6 +179,16 @@ private void setupIndex(String index, int docs) throws IOException { if (exists.getStatusLine().getStatusCode() == 200) { return; } + Request create = new Request("PUT", index); + create.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1 + } + } + }"""); // Use a single shard to get consistent results. + client().performRequest(create); StringBuilder bulk = new StringBuilder(); for (int d = 0; d < docs; d++) { bulk.append("{\"index\":{}}\n");