Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
* such as slice index and future max timestamp, to allow downstream operators to optimize processing.
*/
public final class TimeSeriesSourceOperator extends LuceneSourceOperator {
private static final int MAX_TARGET_PAGE_SIZE = 2048;
private static final int CHUNK_SIZE = 128;

public static final class Factory extends LuceneSourceOperator.Factory {
public Factory(
Expand Down Expand Up @@ -76,4 +78,17 @@ protected void buildMetadataBlocks(Block[] blocks, int offset, int currentPagePo
blocks[offset] = blockFactory.newConstantIntVector(currentSlice.slicePosition(), currentPagePos).asBlock();
blocks[offset + 1] = blockFactory.newConstantLongVector(Long.MAX_VALUE, currentPagePos).asBlock();
}

/**
* For time-series queries, try to use a page size that is a multiple of CHUNK_SIZE (see NUMERIC_BLOCK_SIZE in the tsdb codec)
* to enable bulk loading of numeric or tsid fields. Avoid pages that are too large, as this can disable bulk loading if there are
* holes in the doc IDs and disable constant block optimizations. Therefore, we cap the page size at 2048, which balances the
* overhead per page with the benefits of bulk loading and constant blocks.
*/
public static int pageSize(long estimateRowSizeInBytes, long maxPageSizeInBytes) {
long chunkSizeInBytes = CHUNK_SIZE * estimateRowSizeInBytes;
long numChunks = Math.ceilDiv(maxPageSizeInBytes, chunkSizeInBytes);
long pageSize = Math.clamp(numChunks * CHUNK_SIZE, CHUNK_SIZE, MAX_TARGET_PAGE_SIZE);
return Math.toIntExact(pageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ else if (aggregatorMode.isOutputPartial()) {
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
aggregatorMode,
aggregatorFactories,
context.pageSize(aggregateExec.estimatedRowSize()),
context.pageSize(aggregateExec, aggregateExec.estimatedRowSize()),
analysisRegistry
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
context.pageSize(esQueryExec, rowEstimatedSize),
limit,
sortBuilders,
estimatedPerRowSortSize,
Expand All @@ -312,17 +312,17 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
shardContexts,
querySupplier(esQueryExec.queryBuilderAndTags()),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
context.pageSize(esQueryExec, rowEstimatedSize),
limit
);
} else {
luceneFactory = new LuceneSourceOperator.Factory(
shardContexts,
querySupplier(esQueryExec.queryBuilderAndTags()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
context.autoPartitioningStrategy().get(),
context.autoPartitioningStrategy(),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
context.pageSize(esQueryExec, rowEstimatedSize),
limit,
scoring
);
Expand Down Expand Up @@ -408,7 +408,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
groupSpecs,
aggregatorMode,
aggregatorFactories,
context.pageSize(ts.estimatedRowSize())
context.pageSize(ts, ts.estimatedRowSize())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.lucene.LuceneOperator;
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
import org.elasticsearch.compute.operator.ChangePointOperator;
import org.elasticsearch.compute.operator.ColumnExtractOperator;
import org.elasticsearch.compute.operator.ColumnLoadOperator;
Expand Down Expand Up @@ -203,8 +204,13 @@ public LocalExecutionPlanner(
/**
* turn the given plan into a list of drivers to execute
*/
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {

public LocalExecutionPlan plan(
String description,
FoldContext foldCtx,
PlannerSettings plannerSettings,
PhysicalPlan localPhysicalPlan
) {
final boolean timeSeries = localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec);
var context = new LocalExecutionPlannerContext(
description,
new ArrayList<>(),
Expand All @@ -213,12 +219,8 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
bigArrays,
blockFactory,
foldCtx,
settings,
new Holder<>(
localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec)
? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES
: DataPartitioning.AutoStrategy.DEFAULT
),
plannerSettings,
timeSeries,
shardContexts
);

Expand Down Expand Up @@ -503,7 +505,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
throw new EsqlIllegalArgumentException("limit only supported with literal values");
}
return source.with(
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(rowSize)),
new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(topNExec, rowSize)),
source.layout
);
}
Expand Down Expand Up @@ -1024,8 +1026,8 @@ public record LocalExecutionPlannerContext(
BigArrays bigArrays,
BlockFactory blockFactory,
FoldContext foldCtx,
Settings settings,
Holder<DataPartitioning.AutoStrategy> autoPartitioningStrategy,
PlannerSettings plannerSettings,
boolean timeSeries,
List<EsPhysicalOperationProviders.ShardContext> shardContexts
) {
void addDriverFactory(DriverFactory driverFactory) {
Expand All @@ -1036,7 +1038,11 @@ void driverParallelism(DriverParallelism parallelism) {
driverParallelism.set(parallelism);
}

int pageSize(Integer estimatedRowSize) {
DataPartitioning.AutoStrategy autoPartitioningStrategy() {
return timeSeries ? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES : DataPartitioning.AutoStrategy.DEFAULT;
}

int pageSize(PhysicalPlan node, Integer estimatedRowSize) {
if (estimatedRowSize == null) {
throw new IllegalStateException("estimated row size hasn't been set");
}
Expand All @@ -1046,7 +1052,11 @@ int pageSize(Integer estimatedRowSize) {
if (queryPragmas.pageSize() != 0) {
return queryPragmas.pageSize();
}
return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize);
if (timeSeries && node instanceof EsQueryExec) {
return TimeSeriesSourceOperator.pageSize(estimatedRowSize, plannerSettings.valuesLoadingJumboSize().getBytes());
} else {
return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) {
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
// it's doing this in the planning of EsQueryExec (the source of the data)
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan);
var localExecutionPlan = planner.plan(context.description(), context.foldCtx(), plannerSettings, localPlan);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ void executeSubPlan(
LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan(
"final",
foldCtx,
TEST_PLANNER_SETTINGS,
new OutputExec(coordinatorPlan, collectedPages::add)
);
drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName()));
Expand All @@ -745,7 +746,12 @@ void executeSubPlan(
throw new AssertionError("expected no failure", e);
})
);
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan);
LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(
"data",
foldCtx,
EsqlTestUtils.TEST_PLANNER_SETTINGS,
csvDataNodePhysicalPlan
);

drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName()));
Randomness.shuffle(drivers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerSettings;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
Expand Down Expand Up @@ -247,6 +248,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
private TestDataSource countriesBboxWeb; // cartesian_shape field tests

private final Configuration config;
private PlannerSettings plannerSettings;

private record TestDataSource(Map<String, EsField> mapping, EsIndex index, Analyzer analyzer, SearchStats stats) {}

Expand Down Expand Up @@ -360,6 +362,7 @@ public void init() {
functionRegistry,
enrichResolution
);
this.plannerSettings = TEST_PLANNER_SETTINGS;
}

TestDataSource makeTestDataSource(
Expand Down Expand Up @@ -7878,7 +7881,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
List.of()
);

return planner.plan("test", FoldContext.small(), plan);
return planner.plan("test", FoldContext.small(), plannerSettings, plan);
}

private List<Set<String>> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
Expand All @@ -42,6 +45,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
Expand All @@ -51,9 +55,11 @@
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
import org.elasticsearch.xpack.esql.core.util.StringUtils;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;
Expand Down Expand Up @@ -125,6 +131,7 @@ public void testLuceneSourceOperatorHugeRowSize() throws IOException {
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
"test",
FoldContext.small(),
TEST_PLANNER_SETTINGS,
new EsQueryExec(
Source.EMPTY,
index().name(),
Expand Down Expand Up @@ -156,6 +163,7 @@ public void testLuceneTopNSourceOperator() throws IOException {
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
"test",
FoldContext.small(),
TEST_PLANNER_SETTINGS,
new EsQueryExec(
Source.EMPTY,
index().name(),
Expand Down Expand Up @@ -187,6 +195,7 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException {
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
"test",
FoldContext.small(),
TEST_PLANNER_SETTINGS,
new EsQueryExec(
Source.EMPTY,
index().name(),
Expand All @@ -211,6 +220,7 @@ public void testDriverClusterAndNodeName() throws IOException {
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan(
"test",
FoldContext.small(),
TEST_PLANNER_SETTINGS,
new EsQueryExec(
Source.EMPTY,
index().name(),
Expand Down Expand Up @@ -244,6 +254,41 @@ public void testPlanUnmappedFieldExtractSyntheticSource() throws Exception {
assertThat(blockLoader, instanceOf(FallbackSyntheticSourceBlockLoader.class));
}

public void testTimeSeries() throws IOException {
int estimatedRowSize = estimatedRowSizeIsHuge ? randomIntBetween(20000, Integer.MAX_VALUE) : randomIntBetween(1, 50);
EsQueryExec queryExec = new EsQueryExec(
Source.EMPTY,
index().name(),
IndexMode.STANDARD,
index().indexNameWithModes(),
List.of(),
new Literal(Source.EMPTY, 10, DataType.INTEGER),
List.of(),
estimatedRowSize,
List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of()))
);
TimeSeriesAggregateExec aggExec = new TimeSeriesAggregateExec(
Source.EMPTY,
queryExec,
List.of(),
List.of(new Alias(Source.EMPTY, "count(*)", new Count(Source.EMPTY, Literal.keyword(Source.EMPTY, "*")))),
AggregatorMode.SINGLE,
List.of(),
10,
null
);
PlannerSettings plannerSettings = new PlannerSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000, ByteSizeValue.ofMb(1));
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), plannerSettings, aggExec);
assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency()));
LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier();
var factory = (LuceneSourceOperator.Factory) supplier.physicalOperation().sourceOperatorFactory;
if (estimatedRowSizeIsHuge) {
assertThat(factory.maxPageSize(), equalTo(128));
} else {
assertThat(factory.maxPageSize(), equalTo(2048));
}
}

private BlockLoader constructBlockLoader() throws IOException {
EsQueryExec queryExec = new EsQueryExec(
Source.EMPTY,
Expand All @@ -264,7 +309,7 @@ private BlockLoader constructBlockLoader() throws IOException {
),
MappedFieldType.FieldExtractPreference.NONE
);
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), fieldExtractExec);
var plan = planner().plan("test", FoldContext.small(), TEST_PLANNER_SETTINGS, fieldExtractExec);
var p = plan.driverFactories.get(0).driverSupplier().physicalOperation();
var fieldInfo = ((ValuesSourceReaderOperator.Factory) p.intermediateOperatorFactories.get(0)).fields().get(0);
return fieldInfo.blockLoader().apply(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
groupSpecs,
aggregatorMode,
aggregatorFactories,
context.pageSize(ts.estimatedRowSize())
context.pageSize(ts, ts.estimatedRowSize())
);
}

Expand Down