diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java index 57aa49911146b..216cd895531b3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java @@ -7,9 +7,11 @@ package org.elasticsearch.compute.lucene; +import org.apache.lucene.search.Query; import org.elasticsearch.compute.operator.Driver; import java.util.List; +import java.util.function.Function; /** * How we partition the data across {@link Driver}s. Each request forks into @@ -54,5 +56,19 @@ public enum DataPartitioning { * their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}. * */ - DOC + DOC; + + @FunctionalInterface + public interface AutoStrategy { + Function pickStrategy(int limit); + + AutoStrategy DEFAULT = LuceneSourceOperator.Factory::autoStrategy; + AutoStrategy DEFAULT_TIME_SERIES = limit -> { + if (limit == LuceneOperator.NO_LIMIT) { + return q -> LuceneSliceQueue.PartitioningStrategy.DOC; + } else { + return DEFAULT.pickStrategy(limit); + } + }; + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 5201eede502df..5d0fc84727fea 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -68,6 +68,7 @@ public Factory( List contexts, Function> queryFunction, DataPartitioning dataPartitioning, + DataPartitioning.AutoStrategy autoStrategy, int taskConcurrency, int maxPageSize, int limit, @@ -77,7 +78,7 @@ public Factory( contexts, queryFunction, dataPartitioning, - autoStrategy(limit), + autoStrategy.pickStrategy(limit), taskConcurrency, limit, needsScore, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 8185b045029b3..c247b4765548a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -381,6 +381,7 @@ static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List queryAndTags, randomFrom(DataPartitioning.values()), + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), randomPageSize(), limit, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 2ef64623daa74..3879ab117d734 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -279,6 +279,7 @@ private static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List.of(searchContext), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())), randomFrom(DataPartitioning.values()), + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), randomPageSize(), LuceneOperator.NO_LIMIT, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index 91b8de1a08573..cc21d180113a0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -224,6 +224,7 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i List.of(ctx), queryFunction, dataPartitioning, + DataPartitioning.AutoStrategy.DEFAULT, taskConcurrency, maxPageSize, limit, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java index 5a1f2ee7cc949..6de494fe26b6a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java @@ -277,6 +277,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv shardContexts, ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, + DataPartitioning.AutoStrategy.DEFAULT, 1,// randomIntBetween(1, 10), pageSize, LuceneOperator.NO_LIMIT, @@ -1312,6 +1313,7 @@ public void testWithNulls() throws IOException { List.of(shardContext), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), randomFrom(DataPartitioning.values()), + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), randomPageSize(), LuceneOperator.NO_LIMIT, @@ -1473,6 +1475,7 @@ public void testManyShards() throws IOException { contexts, ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), 1000, LuceneOperator.NO_LIMIT, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java index 19a645c146242..2dd0e7c4de41b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java @@ -195,6 +195,7 @@ private SourceOperator sourceOperator(DriverContext context, int pageSize) { List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), pageSize, LuceneOperator.NO_LIMIT, @@ -1506,6 +1507,7 @@ public void testWithNulls() throws IOException { List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), randomFrom(DataPartitioning.values()), + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), randomPageSize(), LuceneOperator.NO_LIMIT, @@ -1755,6 +1757,7 @@ public void testManyShards() throws IOException { contexts, ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SHARD, + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), 1000, LuceneOperator.NO_LIMIT, diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index dc5e815a85697..72427a5afc9d6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; @@ -25,6 +24,7 @@ import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ShardContext; @@ -280,9 +280,10 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices) List.of(esqlContext), ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), DataPartitioning.SEGMENT, + DataPartitioning.AutoStrategy.DEFAULT, 1, 10000, - DocIdSetIterator.NO_MORE_DOCS, + LuceneOperator.NO_LIMIT, false // no scoring ); List fieldInfos = new ArrayList<>(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index e0b570267899b..d5101e6b8be7c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -292,6 +292,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, shardContexts, querySupplier(esQueryExec.query()), context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()), + context.autoPartitioningStrategy().get(), context.queryPragmas().taskConcurrency(), context.pageSize(rowEstimatedSize), limit, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index ec1539fa3ae38..5ae15f4c0e844 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LocalCircuitBreaker; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.operator.ChangePointOperator; import org.elasticsearch.compute.operator.ColumnExtractOperator; @@ -117,6 +118,7 @@ import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec; import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.ShowExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec; @@ -201,6 +203,7 @@ public LocalExecutionPlanner( * turn the given plan into a list of drivers to execute */ public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { + var context = new LocalExecutionPlannerContext( description, new ArrayList<>(), @@ -210,6 +213,11 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical blockFactory, foldCtx, settings, + new Holder<>( + localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec) + ? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES + : DataPartitioning.AutoStrategy.DEFAULT + ), shardContexts ); @@ -1012,6 +1020,7 @@ public record LocalExecutionPlannerContext( BlockFactory blockFactory, FoldContext foldCtx, Settings settings, + Holder autoPartitioningStrategy, List shardContexts ) { void addDriverFactory(DriverFactory driverFactory) {