Skip to content

Commit d0677c0

Browse files
committed
Updated Physicial rules and fixed Spatial rule for time series
1 parent 31a2300 commit d0677c0

File tree

6 files changed

+50
-13
lines changed

6 files changed

+50
-13
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
1515
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
1616
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
17+
import org.elasticsearch.xpack.esql.plan.physical.AbstractAggregateExec;
1718
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
1819
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
1920
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
@@ -53,7 +54,7 @@ public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext contex
5354
* it loads the field lazily. If we have more than one field we need to
5455
* make sure the fields are loaded for the standard hash aggregator.
5556
*/
56-
if (p instanceof AggregateExec agg) {
57+
if (p instanceof AbstractAggregateExec agg) {
5758
var ordinalAttributes = agg.ordinalAttributes();
5859
missing.removeAll(Expressions.references(ordinalAttributes));
5960
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SpatialDocValuesExtraction.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesFunction;
2121
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
2222
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
23+
import org.elasticsearch.xpack.esql.plan.physical.AbstractAggregateExec;
2324
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
2425
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
2526
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
2627
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
2728
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
29+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
30+
import org.elasticsearch.xpack.esql.plan.physical.TopNAggregateExec;
2831
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
2932
import org.elasticsearch.xpack.esql.stats.SearchStats;
3033

@@ -76,7 +79,7 @@ protected PhysicalPlan rule(AggregateExec aggregate, LocalPhysicalOptimizerConte
7679
var foundAttributes = new HashSet<FieldAttribute>();
7780

7881
PhysicalPlan plan = aggregate.transformDown(UnaryExec.class, exec -> {
79-
if (exec instanceof AggregateExec agg) {
82+
if (exec instanceof AbstractAggregateExec agg) {
8083
var orderedAggregates = new ArrayList<NamedExpression>();
8184
var changedAggregates = false;
8285
for (NamedExpression aggExpr : agg.aggregates()) {
@@ -97,15 +100,39 @@ && allowedForDocValues(fieldAttribute, ctx.searchStats(), agg, foundAttributes))
97100
}
98101
}
99102
if (changedAggregates) {
100-
exec = new AggregateExec(
101-
agg.source(),
102-
agg.child(),
103-
agg.groupings(),
104-
orderedAggregates,
105-
agg.getMode(),
106-
agg.intermediateAttributes(),
107-
agg.estimatedRowSize()
108-
);
103+
exec = switch (agg) {
104+
case TimeSeriesAggregateExec tsAggExec -> new TimeSeriesAggregateExec(
105+
agg.source(),
106+
agg.child(),
107+
agg.groupings(),
108+
orderedAggregates,
109+
agg.getMode(),
110+
agg.intermediateAttributes(),
111+
agg.estimatedRowSize(),
112+
tsAggExec.timeBucket()
113+
);
114+
case AggregateExec aggExec -> new AggregateExec(
115+
agg.source(),
116+
agg.child(),
117+
agg.groupings(),
118+
orderedAggregates,
119+
agg.getMode(),
120+
agg.intermediateAttributes(),
121+
agg.estimatedRowSize()
122+
);
123+
case TopNAggregateExec topNAggExec -> new TopNAggregateExec(
124+
agg.source(),
125+
agg.child(),
126+
agg.groupings(),
127+
orderedAggregates,
128+
agg.getMode(),
129+
agg.intermediateAttributes(),
130+
agg.estimatedRowSize(),
131+
topNAggExec.order(),
132+
topNAggExec.limit()
133+
);
134+
default -> throw new IllegalStateException("Unexpected aggregate type: " + agg.getClass().getName());
135+
};
109136
}
110137
}
111138
if (exec instanceof EvalExec evalExec) {
@@ -179,7 +206,7 @@ private boolean foundField(Expression expression, Set<FieldAttribute> foundAttri
179206
private boolean allowedForDocValues(
180207
FieldAttribute fieldAttribute,
181208
SearchStats stats,
182-
AggregateExec agg,
209+
AbstractAggregateExec agg,
183210
Set<FieldAttribute> foundAttributes
184211
) {
185212
if (stats.hasDocValues(fieldAttribute.fieldName()) == false) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AbstractAggregateExec.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public PhysicalPlan estimateRowSize(State state) {
115115

116116
protected abstract AbstractAggregateExec withEstimatedSize(int estimatedRowSize);
117117

118+
public abstract AbstractAggregateExec withMode(AggregatorMode newMode);
119+
118120
public AggregatorMode getMode() {
119121
return mode;
120122
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public AggregateExec replaceChild(PhysicalPlan newChild) {
6565
return new AggregateExec(source(), newChild, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize);
6666
}
6767

68+
@Override
6869
public AggregateExec withMode(AggregatorMode newMode) {
6970
return new AggregateExec(source(), child(), groupings, aggregates, newMode, intermediateAttributes, estimatedRowSize);
7071
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TopNAggregateExec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ protected TopNAggregateExec withEstimatedSize(int estimatedRowSize) {
103103
);
104104
}
105105

106+
@Override
107+
public TopNAggregateExec withMode(AggregatorMode newMode) {
108+
return new TopNAggregateExec(source(), child(), groupings, aggregates, newMode, intermediateAttributes, estimatedRowSize, order, limit);
109+
}
110+
106111
@Override
107112
public int hashCode() {
108113
return Objects.hash(groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, order, limit, child());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.xpack.esql.plan.QueryPlan;
3939
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
4040
import org.elasticsearch.xpack.esql.plan.logical.Filter;
41+
import org.elasticsearch.xpack.esql.plan.physical.AbstractAggregateExec;
4142
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4243
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
4344
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -123,7 +124,7 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
123124
final var pipelineBreaker = pipelineBreakers.getFirst();
124125
final LocalMapper mapper = new LocalMapper();
125126
PhysicalPlan reducePlan = mapper.map(pipelineBreaker);
126-
if (reducePlan instanceof AggregateExec agg) {
127+
if (reducePlan instanceof AbstractAggregateExec agg) {
127128
reducePlan = agg.withMode(AggregatorMode.INTERMEDIATE);
128129
}
129130
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);

0 commit comments

Comments
 (0)