Skip to content

Commit 152b994

Browse files
committed
Enable doc partitioning by default for time-series queries
1 parent ee94a49 commit 152b994

File tree

10 files changed

+41
-4
lines changed

10 files changed

+41
-4
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
package org.elasticsearch.compute.lucene;
99

10+
import org.apache.lucene.search.Query;
1011
import org.elasticsearch.compute.operator.Driver;
1112

1213
import java.util.List;
14+
import java.util.function.Function;
1315

1416
/**
1517
* How we partition the data across {@link Driver}s. Each request forks into
@@ -54,5 +56,19 @@ public enum DataPartitioning {
5456
* their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}.</li>
5557
* </ol>
5658
*/
57-
DOC
59+
DOC;
60+
61+
@FunctionalInterface
62+
public interface AutoStrategy {
63+
Function<Query, LuceneSliceQueue.PartitioningStrategy> pickStrategy(int limit);
64+
65+
AutoStrategy DEFAULT = LuceneSourceOperator.Factory::autoStrategy;
66+
AutoStrategy DEFAULT_TIME_SERIES = limit -> {
67+
if (limit == LuceneOperator.NO_LIMIT) {
68+
return q -> LuceneSliceQueue.PartitioningStrategy.DOC;
69+
} else {
70+
return DEFAULT.pickStrategy(limit);
71+
}
72+
};
73+
}
5874
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public Factory(
6868
List<? extends ShardContext> contexts,
6969
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
7070
DataPartitioning dataPartitioning,
71+
DataPartitioning.AutoStrategy autoStrategy,
7172
int taskConcurrency,
7273
int maxPageSize,
7374
int limit,
@@ -77,7 +78,7 @@ public Factory(
7778
contexts,
7879
queryFunction,
7980
dataPartitioning,
80-
autoStrategy(limit),
81+
autoStrategy.pickStrategy(limit),
8182
taskConcurrency,
8283
limit,
8384
needsScore,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List<Luc
381381
List.of(searchContext),
382382
ctx -> queryAndTags,
383383
randomFrom(DataPartitioning.values()),
384+
DataPartitioning.AutoStrategy.DEFAULT,
384385
randomIntBetween(1, 10),
385386
randomPageSize(),
386387
limit,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ private static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader,
279279
List.of(searchContext),
280280
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
281281
randomFrom(DataPartitioning.values()),
282+
DataPartitioning.AutoStrategy.DEFAULT,
282283
randomIntBetween(1, 10),
283284
randomPageSize(),
284285
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i
224224
List.of(ctx),
225225
queryFunction,
226226
dataPartitioning,
227+
DataPartitioning.AutoStrategy.DEFAULT,
227228
taskConcurrency,
228229
maxPageSize,
229230
limit,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv
277277
shardContexts,
278278
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
279279
DataPartitioning.SHARD,
280+
DataPartitioning.AutoStrategy.DEFAULT,
280281
1,// randomIntBetween(1, 10),
281282
pageSize,
282283
LuceneOperator.NO_LIMIT,
@@ -1312,6 +1313,7 @@ public void testWithNulls() throws IOException {
13121313
List.of(shardContext),
13131314
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
13141315
randomFrom(DataPartitioning.values()),
1316+
DataPartitioning.AutoStrategy.DEFAULT,
13151317
randomIntBetween(1, 10),
13161318
randomPageSize(),
13171319
LuceneOperator.NO_LIMIT,
@@ -1473,6 +1475,7 @@ public void testManyShards() throws IOException {
14731475
contexts,
14741476
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
14751477
DataPartitioning.SHARD,
1478+
DataPartitioning.AutoStrategy.DEFAULT,
14761479
randomIntBetween(1, 10),
14771480
1000,
14781481
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private SourceOperator sourceOperator(DriverContext context, int pageSize) {
195195
List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
196196
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
197197
DataPartitioning.SHARD,
198+
DataPartitioning.AutoStrategy.DEFAULT,
198199
randomIntBetween(1, 10),
199200
pageSize,
200201
LuceneOperator.NO_LIMIT,
@@ -1506,6 +1507,7 @@ public void testWithNulls() throws IOException {
15061507
List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
15071508
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
15081509
randomFrom(DataPartitioning.values()),
1510+
DataPartitioning.AutoStrategy.DEFAULT,
15091511
randomIntBetween(1, 10),
15101512
randomPageSize(),
15111513
LuceneOperator.NO_LIMIT,
@@ -1755,6 +1757,7 @@ public void testManyShards() throws IOException {
17551757
contexts,
17561758
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
17571759
DataPartitioning.SHARD,
1760+
DataPartitioning.AutoStrategy.DEFAULT,
17581761
randomIntBetween(1, 10),
17591762
1000,
17601763
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10-
import org.apache.lucene.search.DocIdSetIterator;
1110
import org.apache.lucene.search.MatchAllDocsQuery;
1211
import org.apache.lucene.util.BytesRef;
1312
import org.elasticsearch.action.ActionListener;
@@ -25,6 +24,7 @@
2524
import org.elasticsearch.compute.data.LongBlock;
2625
import org.elasticsearch.compute.data.LongVector;
2726
import org.elasticsearch.compute.lucene.DataPartitioning;
27+
import org.elasticsearch.compute.lucene.LuceneOperator;
2828
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
2929
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
3030
import org.elasticsearch.compute.lucene.ShardContext;
@@ -280,9 +280,10 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
280280
List.of(esqlContext),
281281
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
282282
DataPartitioning.SEGMENT,
283+
DataPartitioning.AutoStrategy.DEFAULT,
283284
1,
284285
10000,
285-
DocIdSetIterator.NO_MORE_DOCS,
286+
LuceneOperator.NO_LIMIT,
286287
false // no scoring
287288
);
288289
List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
292292
shardContexts,
293293
querySupplier(esQueryExec.query()),
294294
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
295+
context.autoPartitioningStrategy().get(),
295296
context.queryPragmas().taskConcurrency(),
296297
context.pageSize(rowEstimatedSize),
297298
limit,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.compute.data.ElementType;
2121
import org.elasticsearch.compute.data.LocalCircuitBreaker;
2222
import org.elasticsearch.compute.data.Page;
23+
import org.elasticsearch.compute.lucene.DataPartitioning;
2324
import org.elasticsearch.compute.lucene.LuceneOperator;
2425
import org.elasticsearch.compute.operator.ChangePointOperator;
2526
import org.elasticsearch.compute.operator.ColumnExtractOperator;
@@ -117,6 +118,7 @@
117118
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
118119
import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
119120
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
121+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
120122
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
121123
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
122124
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
@@ -201,6 +203,7 @@ public LocalExecutionPlanner(
201203
* turn the given plan into a list of drivers to execute
202204
*/
203205
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
206+
204207
var context = new LocalExecutionPlannerContext(
205208
description,
206209
new ArrayList<>(),
@@ -210,6 +213,11 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
210213
blockFactory,
211214
foldCtx,
212215
settings,
216+
new Holder<>(
217+
localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec)
218+
? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES
219+
: DataPartitioning.AutoStrategy.DEFAULT
220+
),
213221
shardContexts
214222
);
215223

@@ -1012,6 +1020,7 @@ public record LocalExecutionPlannerContext(
10121020
BlockFactory blockFactory,
10131021
FoldContext foldCtx,
10141022
Settings settings,
1023+
Holder<DataPartitioning.AutoStrategy> autoPartitioningStrategy,
10151024
List<EsPhysicalOperationProviders.ShardContext> shardContexts
10161025
) {
10171026
void addDriverFactory(DriverFactory driverFactory) {

0 commit comments

Comments
 (0)