Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

package org.elasticsearch.compute.lucene;

import org.apache.lucene.search.Query;
import org.elasticsearch.compute.operator.Driver;

import java.util.List;
import java.util.function.Function;

/**
* How we partition the data across {@link Driver}s. Each request forks into
Expand Down Expand Up @@ -54,5 +56,19 @@ public enum DataPartitioning {
* their own tasks. See {@link LuceneSliceQueue#nextSlice(LuceneSlice)}.</li>
* </ol>
*/
DOC
DOC;

@FunctionalInterface
public interface AutoStrategy {
Function<Query, LuceneSliceQueue.PartitioningStrategy> pickStrategy(int limit);

AutoStrategy DEFAULT = LuceneSourceOperator.Factory::autoStrategy;
AutoStrategy DEFAULT_TIME_SERIES = limit -> {
if (limit == LuceneOperator.NO_LIMIT) {
return q -> LuceneSliceQueue.PartitioningStrategy.DOC;
} else {
return DEFAULT.pickStrategy(limit);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
DataPartitioning.AutoStrategy autoStrategy,
int taskConcurrency,
int maxPageSize,
int limit,
Expand All @@ -77,7 +78,7 @@ public Factory(
contexts,
queryFunction,
dataPartitioning,
autoStrategy(limit),
autoStrategy.pickStrategy(limit),
taskConcurrency,
limit,
needsScore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List<Luc
List.of(searchContext),
ctx -> queryAndTags,
randomFrom(DataPartitioning.values()),
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
randomPageSize(),
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader,
List.of(searchContext),
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
randomFrom(DataPartitioning.values()),
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
randomPageSize(),
LuceneOperator.NO_LIMIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ private LuceneSourceOperator.Factory simple(DataPartitioning dataPartitioning, i
List.of(ctx),
queryFunction,
dataPartitioning,
DataPartitioning.AutoStrategy.DEFAULT,
taskConcurrency,
maxPageSize,
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ private SourceOperator simpleInput(DriverContext context, int size, int commitEv
shardContexts,
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
DataPartitioning.SHARD,
DataPartitioning.AutoStrategy.DEFAULT,
1,// randomIntBetween(1, 10),
pageSize,
LuceneOperator.NO_LIMIT,
Expand Down Expand Up @@ -1312,6 +1313,7 @@ public void testWithNulls() throws IOException {
List.of(shardContext),
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
randomFrom(DataPartitioning.values()),
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
randomPageSize(),
LuceneOperator.NO_LIMIT,
Expand Down Expand Up @@ -1473,6 +1475,7 @@ public void testManyShards() throws IOException {
contexts,
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
DataPartitioning.SHARD,
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
1000,
LuceneOperator.NO_LIMIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ private SourceOperator sourceOperator(DriverContext context, int pageSize) {
List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
DataPartitioning.SHARD,
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
pageSize,
LuceneOperator.NO_LIMIT,
Expand Down Expand Up @@ -1506,6 +1507,7 @@ public void testWithNulls() throws IOException {
List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
randomFrom(DataPartitioning.values()),
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
randomPageSize(),
LuceneOperator.NO_LIMIT,
Expand Down Expand Up @@ -1755,6 +1757,7 @@ public void testManyShards() throws IOException {
contexts,
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
DataPartitioning.SHARD,
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
1000,
LuceneOperator.NO_LIMIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
Expand All @@ -25,6 +24,7 @@
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.lucene.LuceneOperator;
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ShardContext;
Expand Down Expand Up @@ -280,9 +280,10 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices)
List.of(esqlContext),
ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
DataPartitioning.SEGMENT,
DataPartitioning.AutoStrategy.DEFAULT,
1,
10000,
DocIdSetIterator.NO_MORE_DOCS,
LuceneOperator.NO_LIMIT,
false // no scoring
);
List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
shardContexts,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
context.autoPartitioningStrategy().get(),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.LocalCircuitBreaker;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.lucene.LuceneOperator;
import org.elasticsearch.compute.operator.ChangePointOperator;
import org.elasticsearch.compute.operator.ColumnExtractOperator;
Expand Down Expand Up @@ -117,6 +118,7 @@
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
Expand Down Expand Up @@ -201,6 +203,7 @@ public LocalExecutionPlanner(
* turn the given plan into a list of drivers to execute
*/
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {

var context = new LocalExecutionPlannerContext(
description,
new ArrayList<>(),
Expand All @@ -210,6 +213,11 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
blockFactory,
foldCtx,
settings,
new Holder<>(
localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec)
? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES
: DataPartitioning.AutoStrategy.DEFAULT
),
shardContexts
);

Expand Down Expand Up @@ -1012,6 +1020,7 @@ public record LocalExecutionPlannerContext(
BlockFactory blockFactory,
FoldContext foldCtx,
Settings settings,
Holder<DataPartitioning.AutoStrategy> autoPartitioningStrategy,
List<EsPhysicalOperationProviders.ShardContext> shardContexts
) {
void addDriverFactory(DriverFactory driverFactory) {
Expand Down