diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java index 74e9e7411aa7b..c837f7d041bba 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource; @@ -79,7 +80,8 @@ protected static List> rules(boolean optimizeForEsSource) { Limiter.ONCE, new InsertFieldExtraction(), new SpatialDocValuesExtraction(), - new SpatialShapeBoundsExtraction() + new SpatialShapeBoundsExtraction(), + new PushFieldExtractionToTimeSeriesSource() ); return List.of(pushdown, fieldExtraction); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java new file mode 100644 index 0000000000000..544c5333e8c29 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFieldExtractionToTimeSeriesSource.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; + +/** + * A rule that moves field extractions to occur before the time-series aggregation in the time-series source plan. + */ +public class PushFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule< + EsQueryExec, + LocalPhysicalOptimizerContext> { + + public PushFieldExtractionToTimeSeriesSource() { + super(OptimizerRules.TransformDirection.UP); + } + + @Override + public PhysicalPlan rule(EsQueryExec plan, LocalPhysicalOptimizerContext context) { + if (plan.indexMode() == IndexMode.TIME_SERIES) { + return new TimeSeriesSourceExec(plan.source(), plan.output(), plan.query(), plan.limit(), plan.estimatedRowSize()); + } else { + return plan; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java new file mode 100644 index 0000000000000..b83c3c7ab2ed4 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesSourceExec.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.physical; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.NodeUtils; +import org.elasticsearch.xpack.esql.core.tree.Source; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Similar to {@link EsQueryExec}, but this is a physical plan specifically for time series indices. + * This plan is forked from {@link EsQueryExec} to allow field extractions, leveraging caching + * and avoiding the cost of sorting and rebuilding blocks. + */ +public class TimeSeriesSourceExec extends LeafExec implements EstimatesRowSize { + private final List attrs; + private final QueryBuilder query; + private final Expression limit; + + /** + * Estimate of the number of bytes that'll be loaded per position before + * the stream of pages is consumed. + */ + private final Integer estimatedRowSize; + + public TimeSeriesSourceExec(Source source, List attrs, QueryBuilder query, Expression limit, Integer estimatedRowSize) { + super(source); + this.attrs = attrs; + this.query = query; + this.limit = limit; + this.estimatedRowSize = estimatedRowSize; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("local plan"); + } + + @Override + public String getWriteableName() { + throw new UnsupportedOperationException("local plan"); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, TimeSeriesSourceExec::new, attrs, query, limit, estimatedRowSize); + } + + public QueryBuilder query() { + return query; + } + + @Override + public List output() { + return attrs; + } + + public List attrs() { + return attrs; + } + + public Expression limit() { + return limit; + } + + /** + * Estimate of the number of bytes that'll be loaded per position before + * the stream of pages is consumed. + */ + public Integer estimatedRowSize() { + return estimatedRowSize; + } + + @Override + public PhysicalPlan estimateRowSize(State state) { + state.add(false, Integer.BYTES * 2); + state.add(false, 22); // tsid + state.add(false, 8); // timestamp + int size = state.consumeAllFields(false); + return Objects.equals(this.estimatedRowSize, size) ? this : new TimeSeriesSourceExec(source(), attrs, query, limit, size); + } + + @Override + public int hashCode() { + return Objects.hash(attrs, query, limit, estimatedRowSize); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + TimeSeriesSourceExec other = (TimeSeriesSourceExec) obj; + return Objects.equals(attrs, other.attrs) + && Objects.equals(query, other.query) + && Objects.equals(limit, other.limit) + && Objects.equals(estimatedRowSize, other.estimatedRowSize); + } + + @Override + public String nodeString() { + return nodeName() + + "[" + + "query[" + + (query != null ? Strings.toString(query, false, true) : "") + + "] attributes: [" + + NodeUtils.limitedToString(attrs) + + "], estimatedRowSize[" + + estimatedRowSize + + "]"; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index a93e2f16c075b..ed5a499519f9e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation; @@ -191,6 +192,10 @@ public Function querySuppl @Override public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalExecutionPlannerContext context) { + if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) { + assert false : "Time series source should be translated to TimeSeriesSourceExec"; + throw new IllegalStateException("Time series source should be translated to TimeSeriesSourceExec"); + } final LuceneOperator.Factory luceneFactory; logger.trace("Query Exec is {}", esQueryExec); @@ -215,25 +220,15 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, scoring ); } else { - if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) { - luceneFactory = TimeSeriesSortedSourceOperatorFactory.create( - limit, - context.pageSize(rowEstimatedSize), - context.queryPragmas().taskConcurrency(), - shardContexts, - querySupplier(esQueryExec.query()) - ); - } else { - luceneFactory = new LuceneSourceOperator.Factory( - shardContexts, - querySupplier(esQueryExec.query()), - context.queryPragmas().dataPartitioning(defaultDataPartitioning), - context.queryPragmas().taskConcurrency(), - context.pageSize(rowEstimatedSize), - limit, - scoring - ); - } + luceneFactory = new LuceneSourceOperator.Factory( + shardContexts, + querySupplier(esQueryExec.query()), + context.queryPragmas().dataPartitioning(defaultDataPartitioning), + context.queryPragmas().taskConcurrency(), + context.pageSize(rowEstimatedSize), + limit, + scoring + ); } Layout.Builder layout = new Layout.Builder(); layout.append(esQueryExec.output()); @@ -242,6 +237,23 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, return PhysicalOperation.fromSource(luceneFactory, layout.build()); } + @Override + public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) { + final int limit = ts.limit() != null ? (Integer) ts.limit().fold(context.foldCtx()) : NO_LIMIT; + LuceneOperator.Factory luceneFactory = TimeSeriesSortedSourceOperatorFactory.create( + limit, + context.pageSize(ts.estimatedRowSize()), + context.queryPragmas().taskConcurrency(), + shardContexts, + querySupplier(ts.query()) + ); + Layout.Builder layout = new Layout.Builder(); + layout.append(ts.output()); + int instanceCount = Math.max(1, luceneFactory.taskConcurrency()); + context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount)); + return PhysicalOperation.fromSource(luceneFactory, layout.build()); + } + /** * Build a {@link SourceOperator.SourceOperatorFactory} that counts documents in the search index. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 8ef7d43b28d4b..abcd0ec1318ed 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -112,6 +112,7 @@ import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec; import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.ShowExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext; @@ -271,6 +272,8 @@ else if (node instanceof EsQueryExec esQuery) { return planShow(show); } else if (node instanceof ExchangeSourceExec exchangeSource) { return planExchangeSource(exchangeSource, context); + } else if (node instanceof TimeSeriesSourceExec ts) { + return planTimeSeriesNode(ts, context); } // lookups and joins else if (node instanceof EnrichExec enrich) { @@ -328,6 +331,10 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context); } + private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) { + return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context); + } + private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) { if (physicalOperationProviders instanceof EsPhysicalOperationProviders == false) { throw new EsqlIllegalArgumentException("EsStatsQuery should only occur against a Lucene backend"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java index 6353005f44ace..792bdd1c2b6d1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation; @@ -18,6 +19,8 @@ interface PhysicalOperationProviders { PhysicalOperation sourcePhysicalOperation(EsQueryExec esQuery, LocalExecutionPlannerContext context); + PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context); + PhysicalOperation groupingPhysicalOperation( AggregateExec aggregateExec, PhysicalOperation source, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index cf2c5735310ae..ae07d8b1b5975 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -58,6 +58,7 @@ import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation; import org.elasticsearch.xpack.ml.MachineLearning; @@ -128,6 +129,11 @@ public PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalE return PhysicalOperation.fromSource(new TestSourceOperatorFactory(), layout.build()); } + @Override + public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) { + throw new UnsupportedOperationException("time-series source is not supported in CSV tests"); + } + @Override public Operator.OperatorFactory ordinalGroupingOperatorFactory( PhysicalOperation source,