Skip to content

Commit a5c0852

Browse files
authored
Enable doc partitioning by default for time-series queries (#133038)
With #132774, the overhead of running queries with DOC partitioning is small. While we might switch the default data partitioning to DOC for all queries in the future, this PR defaults data partitioning to DOC for time-series queries only to minimize any unexpected impact. Relates #132774
1 parent f8a72d9 commit a5c0852

File tree

10 files changed

+41
-4
lines changed

10 files changed

+41
-4
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
package org.elasticsearch.compute.lucene;
99

10+
import org.apache.lucene.search.Query;
1011
import org.elasticsearch.compute.operator.Driver;
1112

1213
import java.util.List;
14+
import java.util.function.Function;
1315

1416
/**
1517
* How we partition the data across {@link Driver}s. Each request forks into
@@ -54,5 +56,19 @@ public enum DataPartitioning {
5456
* their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}.</li>
5557
* </ol>
5658
*/
57-
DOC
59+
DOC;
60+
61+
@FunctionalInterface
62+
public interface AutoStrategy {
63+
Function<Query, LuceneSliceQueue.PartitioningStrategy> pickStrategy(int limit);
64+
65+
AutoStrategy DEFAULT = LuceneSourceOperator.Factory::autoStrategy;
66+
AutoStrategy DEFAULT_TIME_SERIES = limit -> {
67+
if (limit == LuceneOperator.NO_LIMIT) {
68+
return q -> LuceneSliceQueue.PartitioningStrategy.DOC;
69+
} else {
70+
return DEFAULT.pickStrategy(limit);
71+
}
72+
};
73+
}
5874
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public Factory(
6868
List<? extends ShardContext> contexts,
6969
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
7070
DataPartitioning dataPartitioning,
71+
DataPartitioning.AutoStrategy autoStrategy,
7172
int taskConcurrency,
7273
int maxPageSize,
7374
int limit,
@@ -77,7 +78,7 @@ public Factory(
7778
contexts,
7879
queryFunction,
7980
dataPartitioning,
80-
autoStrategy(limit),
81+
autoStrategy.pickStrategy(limit),
8182
taskConcurrency,
8283
limit,
8384
needsScore,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List<Luc
381381
List.of(searchContext),
382382
ctx -> queryAndTags,
383383
randomFrom(DataPartitioning.values()),
384+
DataPartitioning.AutoStrategy.DEFAULT,
384385
randomIntBetween(1, 10),
385386
randomPageSize(),
386387
limit,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ private static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader,
279279
List.of(searchContext),
280280
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
281281
randomFrom(DataPartitioning.values()),
282+
DataPartitioning.AutoStrategy.DEFAULT,
282283
randomIntBetween(1, 10),
283284
randomPageSize(),
284285
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i
224224
List.of(ctx),
225225
queryFunction,
226226
dataPartitioning,
227+
DataPartitioning.AutoStrategy.DEFAULT,
227228
taskConcurrency,
228229
maxPageSize,
229230
limit,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv
277277
shardContexts,
278278
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
279279
DataPartitioning.SHARD,
280+
DataPartitioning.AutoStrategy.DEFAULT,
280281
1,// randomIntBetween(1, 10),
281282
pageSize,
282283
LuceneOperator.NO_LIMIT,
@@ -1312,6 +1313,7 @@ public void testWithNulls() throws IOException {
13121313
List.of(shardContext),
13131314
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
13141315
randomFrom(DataPartitioning.values()),
1316+
DataPartitioning.AutoStrategy.DEFAULT,
13151317
randomIntBetween(1, 10),
13161318
randomPageSize(),
13171319
LuceneOperator.NO_LIMIT,
@@ -1473,6 +1475,7 @@ public void testManyShards() throws IOException {
14731475
contexts,
14741476
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
14751477
DataPartitioning.SHARD,
1478+
DataPartitioning.AutoStrategy.DEFAULT,
14761479
randomIntBetween(1, 10),
14771480
1000,
14781481
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private SourceOperator sourceOperator(DriverContext context, int pageSize) {
195195
List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
196196
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
197197
DataPartitioning.SHARD,
198+
DataPartitioning.AutoStrategy.DEFAULT,
198199
randomIntBetween(1, 10),
199200
pageSize,
200201
LuceneOperator.NO_LIMIT,
@@ -1506,6 +1507,7 @@ public void testWithNulls() throws IOException {
15061507
List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
15071508
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
15081509
randomFrom(DataPartitioning.values()),
1510+
DataPartitioning.AutoStrategy.DEFAULT,
15091511
randomIntBetween(1, 10),
15101512
randomPageSize(),
15111513
LuceneOperator.NO_LIMIT,
@@ -1755,6 +1757,7 @@ public void testManyShards() throws IOException {
17551757
contexts,
17561758
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
17571759
DataPartitioning.SHARD,
1760+
DataPartitioning.AutoStrategy.DEFAULT,
17581761
randomIntBetween(1, 10),
17591762
1000,
17601763
LuceneOperator.NO_LIMIT,

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10-
import org.apache.lucene.search.DocIdSetIterator;
1110
import org.apache.lucene.search.MatchAllDocsQuery;
1211
import org.apache.lucene.util.BytesRef;
1312
import org.elasticsearch.action.ActionListener;
@@ -25,6 +24,7 @@
2524
import org.elasticsearch.compute.data.LongBlock;
2625
import org.elasticsearch.compute.data.LongVector;
2726
import org.elasticsearch.compute.lucene.DataPartitioning;
27+
import org.elasticsearch.compute.lucene.LuceneOperator;
2828
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
2929
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
3030
import org.elasticsearch.compute.lucene.ShardContext;
@@ -280,9 +280,10 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
280280
List.of(esqlContext),
281281
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
282282
DataPartitioning.SEGMENT,
283+
DataPartitioning.AutoStrategy.DEFAULT,
283284
1,
284285
10000,
285-
DocIdSetIterator.NO_MORE_DOCS,
286+
LuceneOperator.NO_LIMIT,
286287
false // no scoring
287288
);
288289
List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
292292
shardContexts,
293293
querySupplier(esQueryExec.query()),
294294
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
295+
context.autoPartitioningStrategy().get(),
295296
context.queryPragmas().taskConcurrency(),
296297
context.pageSize(rowEstimatedSize),
297298
limit,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.compute.data.ElementType;
2121
import org.elasticsearch.compute.data.LocalCircuitBreaker;
2222
import org.elasticsearch.compute.data.Page;
23+
import org.elasticsearch.compute.lucene.DataPartitioning;
2324
import org.elasticsearch.compute.lucene.LuceneOperator;
2425
import org.elasticsearch.compute.operator.ChangePointOperator;
2526
import org.elasticsearch.compute.operator.ColumnExtractOperator;
@@ -117,6 +118,7 @@
117118
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
118119
import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
119120
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
121+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
120122
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
121123
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
122124
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
@@ -201,6 +203,7 @@ public LocalExecutionPlanner(
201203
* turn the given plan into a list of drivers to execute
202204
*/
203205
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
206+
204207
var context = new LocalExecutionPlannerContext(
205208
description,
206209
new ArrayList<>(),
@@ -210,6 +213,11 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
210213
blockFactory,
211214
foldCtx,
212215
settings,
216+
new Holder<>(
217+
localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec)
218+
? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES
219+
: DataPartitioning.AutoStrategy.DEFAULT
220+
),
213221
shardContexts
214222
);
215223

@@ -1012,6 +1020,7 @@ public record LocalExecutionPlannerContext(
10121020
BlockFactory blockFactory,
10131021
FoldContext foldCtx,
10141022
Settings settings,
1023+
Holder<DataPartitioning.AutoStrategy> autoPartitioningStrategy,
10151024
List<EsPhysicalOperationProviders.ShardContext> shardContexts
10161025
) {
10171026
void addDriverFactory(DriverFactory driverFactory) {

0 commit comments

Comments
 (0)