Skip to content

Commit 0c90de5

Browse files
authored
Fork time-series source to allow field extractions (#127375)
This change prepares for pushing down field extractions to the time-series source for performance reasons. It is a non-issue, as the actual change will occur in a follow-up.
1 parent 966d722 commit 0c90de5

File tree

7 files changed

+217
-20
lines changed

7 files changed

+217
-20
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
14+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource;
1415
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
1516
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
1617
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
@@ -79,7 +80,8 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
7980
Limiter.ONCE,
8081
new InsertFieldExtraction(),
8182
new SpatialDocValuesExtraction(),
82-
new SpatialShapeBoundsExtraction()
83+
new SpatialShapeBoundsExtraction(),
84+
new PushFieldExtractionToTimeSeriesSource()
8385
);
8486
return List.of(pushdown, fieldExtraction);
8587
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
9+
10+
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
12+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
13+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
14+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
15+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
16+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
17+
18+
/**
19+
* A rule that moves field extractions to occur before the time-series aggregation in the time-series source plan.
20+
*/
21+
public class PushFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
22+
EsQueryExec,
23+
LocalPhysicalOptimizerContext> {
24+
25+
public PushFieldExtractionToTimeSeriesSource() {
26+
super(OptimizerRules.TransformDirection.UP);
27+
}
28+
29+
@Override
30+
public PhysicalPlan rule(EsQueryExec plan, LocalPhysicalOptimizerContext context) {
31+
if (plan.indexMode() == IndexMode.TIME_SERIES) {
32+
return new TimeSeriesSourceExec(plan.source(), plan.output(), plan.query(), plan.limit(), plan.estimatedRowSize());
33+
} else {
34+
return plan;
35+
}
36+
}
37+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.physical;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.index.query.QueryBuilder;
13+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
14+
import org.elasticsearch.xpack.esql.core.expression.Expression;
15+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
16+
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
17+
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
19+
import java.io.IOException;
20+
import java.util.List;
21+
import java.util.Objects;
22+
23+
/**
24+
* Similar to {@link EsQueryExec}, but this is a physical plan specifically for time series indices.
25+
* This plan is forked from {@link EsQueryExec} to allow field extractions, leveraging caching
26+
* and avoiding the cost of sorting and rebuilding blocks.
27+
*/
28+
public class TimeSeriesSourceExec extends LeafExec implements EstimatesRowSize {
29+
private final List<Attribute> attrs;
30+
private final QueryBuilder query;
31+
private final Expression limit;
32+
33+
/**
34+
* Estimate of the number of bytes that'll be loaded per position before
35+
* the stream of pages is consumed.
36+
*/
37+
private final Integer estimatedRowSize;
38+
39+
public TimeSeriesSourceExec(Source source, List<Attribute> attrs, QueryBuilder query, Expression limit, Integer estimatedRowSize) {
40+
super(source);
41+
this.attrs = attrs;
42+
this.query = query;
43+
this.limit = limit;
44+
this.estimatedRowSize = estimatedRowSize;
45+
}
46+
47+
@Override
48+
public void writeTo(StreamOutput out) throws IOException {
49+
throw new UnsupportedOperationException("local plan");
50+
}
51+
52+
@Override
53+
public String getWriteableName() {
54+
throw new UnsupportedOperationException("local plan");
55+
}
56+
57+
@Override
58+
protected NodeInfo<TimeSeriesSourceExec> info() {
59+
return NodeInfo.create(this, TimeSeriesSourceExec::new, attrs, query, limit, estimatedRowSize);
60+
}
61+
62+
public QueryBuilder query() {
63+
return query;
64+
}
65+
66+
@Override
67+
public List<Attribute> output() {
68+
return attrs;
69+
}
70+
71+
public List<Attribute> attrs() {
72+
return attrs;
73+
}
74+
75+
public Expression limit() {
76+
return limit;
77+
}
78+
79+
/**
80+
* Estimate of the number of bytes that'll be loaded per position before
81+
* the stream of pages is consumed.
82+
*/
83+
public Integer estimatedRowSize() {
84+
return estimatedRowSize;
85+
}
86+
87+
@Override
88+
public PhysicalPlan estimateRowSize(State state) {
89+
state.add(false, Integer.BYTES * 2);
90+
state.add(false, 22); // tsid
91+
state.add(false, 8); // timestamp
92+
int size = state.consumeAllFields(false);
93+
return Objects.equals(this.estimatedRowSize, size) ? this : new TimeSeriesSourceExec(source(), attrs, query, limit, size);
94+
}
95+
96+
@Override
97+
public int hashCode() {
98+
return Objects.hash(attrs, query, limit, estimatedRowSize);
99+
}
100+
101+
@Override
102+
public boolean equals(Object obj) {
103+
if (this == obj) {
104+
return true;
105+
}
106+
107+
if (obj == null || getClass() != obj.getClass()) {
108+
return false;
109+
}
110+
111+
TimeSeriesSourceExec other = (TimeSeriesSourceExec) obj;
112+
return Objects.equals(attrs, other.attrs)
113+
&& Objects.equals(query, other.query)
114+
&& Objects.equals(limit, other.limit)
115+
&& Objects.equals(estimatedRowSize, other.estimatedRowSize);
116+
}
117+
118+
@Override
119+
public String nodeString() {
120+
return nodeName()
121+
+ "["
122+
+ "query["
123+
+ (query != null ? Strings.toString(query, false, true) : "")
124+
+ "] attributes: ["
125+
+ NodeUtils.limitedToString(attrs)
126+
+ "], estimatedRowSize["
127+
+ estimatedRowSize
128+
+ "]";
129+
}
130+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
6262
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
6363
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
64+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
6465
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
6566
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
6667
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
@@ -191,6 +192,10 @@ public Function<org.elasticsearch.compute.lucene.ShardContext, Query> querySuppl
191192

192193
@Override
193194
public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalExecutionPlannerContext context) {
195+
if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) {
196+
assert false : "Time series source should be translated to TimeSeriesSourceExec";
197+
throw new IllegalStateException("Time series source should be translated to TimeSeriesSourceExec");
198+
}
194199
final LuceneOperator.Factory luceneFactory;
195200
logger.trace("Query Exec is {}", esQueryExec);
196201

@@ -215,25 +220,15 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
215220
scoring
216221
);
217222
} else {
218-
if (esQueryExec.indexMode() == IndexMode.TIME_SERIES) {
219-
luceneFactory = TimeSeriesSortedSourceOperatorFactory.create(
220-
limit,
221-
context.pageSize(rowEstimatedSize),
222-
context.queryPragmas().taskConcurrency(),
223-
shardContexts,
224-
querySupplier(esQueryExec.query())
225-
);
226-
} else {
227-
luceneFactory = new LuceneSourceOperator.Factory(
228-
shardContexts,
229-
querySupplier(esQueryExec.query()),
230-
context.queryPragmas().dataPartitioning(defaultDataPartitioning),
231-
context.queryPragmas().taskConcurrency(),
232-
context.pageSize(rowEstimatedSize),
233-
limit,
234-
scoring
235-
);
236-
}
223+
luceneFactory = new LuceneSourceOperator.Factory(
224+
shardContexts,
225+
querySupplier(esQueryExec.query()),
226+
context.queryPragmas().dataPartitioning(defaultDataPartitioning),
227+
context.queryPragmas().taskConcurrency(),
228+
context.pageSize(rowEstimatedSize),
229+
limit,
230+
scoring
231+
);
237232
}
238233
Layout.Builder layout = new Layout.Builder();
239234
layout.append(esQueryExec.output());
@@ -242,6 +237,23 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
242237
return PhysicalOperation.fromSource(luceneFactory, layout.build());
243238
}
244239

240+
@Override
241+
public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
242+
final int limit = ts.limit() != null ? (Integer) ts.limit().fold(context.foldCtx()) : NO_LIMIT;
243+
LuceneOperator.Factory luceneFactory = TimeSeriesSortedSourceOperatorFactory.create(
244+
limit,
245+
context.pageSize(ts.estimatedRowSize()),
246+
context.queryPragmas().taskConcurrency(),
247+
shardContexts,
248+
querySupplier(ts.query())
249+
);
250+
Layout.Builder layout = new Layout.Builder();
251+
layout.append(ts.output());
252+
int instanceCount = Math.max(1, luceneFactory.taskConcurrency());
253+
context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount));
254+
return PhysicalOperation.fromSource(luceneFactory, layout.build());
255+
}
256+
245257
/**
246258
* Build a {@link SourceOperator.SourceOperatorFactory} that counts documents in the search index.
247259
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
113113
import org.elasticsearch.xpack.esql.plan.physical.SampleExec;
114114
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
115+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
115116
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
116117
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
117118
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
@@ -271,6 +272,8 @@ else if (node instanceof EsQueryExec esQuery) {
271272
return planShow(show);
272273
} else if (node instanceof ExchangeSourceExec exchangeSource) {
273274
return planExchangeSource(exchangeSource, context);
275+
} else if (node instanceof TimeSeriesSourceExec ts) {
276+
return planTimeSeriesNode(ts, context);
274277
}
275278
// lookups and joins
276279
else if (node instanceof EnrichExec enrich) {
@@ -328,6 +331,10 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio
328331
return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context);
329332
}
330333

334+
private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) {
335+
return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context);
336+
}
337+
331338
private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) {
332339
if (physicalOperationProviders instanceof EsPhysicalOperationProviders == false) {
333340
throw new EsqlIllegalArgumentException("EsStatsQuery should only occur against a Lucene backend");

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalOperationProviders.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
1111
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
1212
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
13+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
1314
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
1415
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
1516

@@ -18,6 +19,8 @@ interface PhysicalOperationProviders {
1819

1920
PhysicalOperation sourcePhysicalOperation(EsQueryExec esQuery, LocalExecutionPlannerContext context);
2021

22+
PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context);
23+
2124
PhysicalOperation groupingPhysicalOperation(
2225
AggregateExec aggregateExec,
2326
PhysicalOperation source,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
5959
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
6060
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
61+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
6162
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
6263
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
6364
import org.elasticsearch.xpack.ml.MachineLearning;
@@ -128,6 +129,11 @@ public PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalE
128129
return PhysicalOperation.fromSource(new TestSourceOperatorFactory(), layout.build());
129130
}
130131

132+
@Override
133+
public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
134+
throw new UnsupportedOperationException("time-series source is not supported in CSV tests");
135+
}
136+
131137
@Override
132138
public Operator.OperatorFactory ordinalGroupingOperatorFactory(
133139
PhysicalOperation source,

0 commit comments

Comments
 (0)