Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
Expand Down Expand Up @@ -79,7 +80,8 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
Limiter.ONCE,
new InsertFieldExtraction(),
new SpatialDocValuesExtraction(),
new SpatialShapeBoundsExtraction()
new SpatialShapeBoundsExtraction(),
new PushFieldExtractionToTimeSeriesSource()
);
return List.of(pushdown, fieldExtraction);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;

/**
* A rule that moves field extractions to occur before the time-series aggregation in the time-series source plan.
*/
public class PushFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
EsQueryExec,
LocalPhysicalOptimizerContext> {

public PushFieldExtractionToTimeSeriesSource() {
super(OptimizerRules.TransformDirection.UP);
}

@Override
public PhysicalPlan rule(EsQueryExec plan, LocalPhysicalOptimizerContext context) {
if (plan.indexMode() == IndexMode.TIME_SERIES) {
return new TimeSeriesSourceExec(plan.source(), plan.output(), plan.query(), plan.limit(), plan.estimatedRowSize());
} else {
return plan;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.physical;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* Similar to {@link EsQueryExec}, but this is a physical plan specifically for time series indices.
* This plan is forked from {@link EsQueryExec} to allow field extractions, leveraging caching
* and avoiding the cost of sorting and rebuilding blocks.
*/
public class TimeSeriesSourceExec extends LeafExec implements EstimatesRowSize {
private final List<Attribute> attrs;
private final QueryBuilder query;
private final Expression limit;

/**
* Estimate of the number of bytes that'll be loaded per position before
* the stream of pages is consumed.
*/
private final Integer estimatedRowSize;

public TimeSeriesSourceExec(Source source, List<Attribute> attrs, QueryBuilder query, Expression limit, Integer estimatedRowSize) {
super(source);
this.attrs = attrs;
this.query = query;
this.limit = limit;
this.estimatedRowSize = estimatedRowSize;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("local plan");
}

@Override
public String getWriteableName() {
throw new UnsupportedOperationException("local plan");
}

@Override
protected NodeInfo<TimeSeriesSourceExec> info() {
return NodeInfo.create(this, TimeSeriesSourceExec::new, attrs, query, limit, estimatedRowSize);
}

public QueryBuilder query() {
return query;
}

@Override
public List<Attribute> output() {
return attrs;
}

public List<Attribute> attrs() {
return attrs;
}

public Expression limit() {
return limit;
}

/**
* Estimate of the number of bytes that'll be loaded per position before
* the stream of pages is consumed.
*/
public Integer estimatedRowSize() {
return estimatedRowSize;
}

@Override
public PhysicalPlan estimateRowSize(State state) {
state.add(false, Integer.BYTES * 2);
state.add(false, 22); // tsid
state.add(false, 8); // timestamp
int size = state.consumeAllFields(false);
return Objects.equals(this.estimatedRowSize, size) ? this : new TimeSeriesSourceExec(source(), attrs, query, limit, size);
}

@Override
public int hashCode() {
return Objects.hash(attrs, query, limit, estimatedRowSize);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

TimeSeriesSourceExec other = (TimeSeriesSourceExec) obj;
return Objects.equals(attrs, other.attrs)
&& Objects.equals(query, other.query)
&& Objects.equals(limit, other.limit)
&& Objects.equals(estimatedRowSize, other.estimatedRowSize);
}

@Override
public String nodeString() {
return nodeName()
+ "["
+ "query["
+ (query != null ? Strings.toString(query, false, true) : "")
+ "] attributes: ["
+ NodeUtils.limitedToString(attrs)
+ "], estimatedRowSize["
+ estimatedRowSize
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
Expand Down Expand Up @@ -191,6 +192,10 @@ public Function<org.elasticsearch.compute.lucene.ShardContext, Query> querySuppl

@Override
public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalExecutionPlannerContext context) {
if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) {
assert false : "Time series source should be translated to TimeSeriesSourceExec";
throw new IllegalStateException("Time series source should be translated to TimeSeriesSourceExec");
}
final LuceneOperator.Factory luceneFactory;
logger.trace("Query Exec is {}", esQueryExec);

Expand All @@ -215,25 +220,15 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
scoring
);
} else {
if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) {
luceneFactory = TimeSeriesSortedSourceOperatorFactory.create(
limit,
context.pageSize(rowEstimatedSize),
context.queryPragmas().taskConcurrency(),
shardContexts,
querySupplier(esQueryExec.query())
);
} else {
luceneFactory = new LuceneSourceOperator.Factory(
shardContexts,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(defaultDataPartitioning),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
limit,
scoring
);
}
luceneFactory = new LuceneSourceOperator.Factory(
shardContexts,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(defaultDataPartitioning),
context.queryPragmas().taskConcurrency(),
context.pageSize(rowEstimatedSize),
limit,
scoring
);
}
Layout.Builder layout = new Layout.Builder();
layout.append(esQueryExec.output());
Expand All @@ -242,6 +237,23 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
return PhysicalOperation.fromSource(luceneFactory, layout.build());
}

@Override
public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
final int limit = ts.limit() != null ? (Integer) ts.limit().fold(context.foldCtx()) : NO_LIMIT;
LuceneOperator.Factory luceneFactory = TimeSeriesSortedSourceOperatorFactory.create(
limit,
context.pageSize(ts.estimatedRowSize()),
context.queryPragmas().taskConcurrency(),
shardContexts,
querySupplier(ts.query())
);
Layout.Builder layout = new Layout.Builder();
layout.append(ts.output());
int instanceCount = Math.max(1, luceneFactory.taskConcurrency());
context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount));
return PhysicalOperation.fromSource(luceneFactory, layout.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: this should be eventually move to a helper function to avoid duplication.

}

/**
* Build a {@link SourceOperator.SourceOperatorFactory} that counts documents in the search index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
Expand Down Expand Up @@ -271,6 +272,8 @@ else if (node instanceof EsQueryExec esQuery) {
return planShow(show);
} else if (node instanceof ExchangeSourceExec exchangeSource) {
return planExchangeSource(exchangeSource, context);
} else if (node instanceof TimeSeriesSourceExec ts) {
return planTimeSeriesNode(ts, context);
}
// lookups and joins
else if (node instanceof EnrichExec enrich) {
Expand Down Expand Up @@ -328,6 +331,10 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio
return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context);
}

private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) {
return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context);
}

private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) {
if (physicalOperationProviders instanceof EsPhysicalOperationProviders == false) {
throw new EsqlIllegalArgumentException("EsStatsQuery should only occur against a Lucene backend");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;

Expand All @@ -18,6 +19,8 @@ interface PhysicalOperationProviders {

PhysicalOperation sourcePhysicalOperation(EsQueryExec esQuery, LocalExecutionPlannerContext context);

PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context);

PhysicalOperation groupingPhysicalOperation(
AggregateExec aggregateExec,
PhysicalOperation source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
import org.elasticsearch.xpack.ml.MachineLearning;
Expand Down Expand Up @@ -128,6 +129,11 @@ public PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalE
return PhysicalOperation.fromSource(new TestSourceOperatorFactory(), layout.build());
}

@Override
public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
throw new UnsupportedOperationException("time-series source is not supported in CSV tests");
}

@Override
public Operator.OperatorFactory ordinalGroupingOperatorFactory(
PhysicalOperation source,
Expand Down