Skip to content

Commit 506a01f

Browse files
committed
Add logical/physical plans time-series aggregate
1 parent b563145 commit 506a01f

File tree

20 files changed

+260
-178
lines changed

20 files changed

+260
-178
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ static TransportVersion def(int id) {
211211
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
212212
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
213213
public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00);
214+
public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00);
214215

215216
/*
216217
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected LogicalPlan rule(UnaryPlan plan) {
5353
// project can be fully removed
5454
if (newAggs != null) {
5555
var newGroups = replacePrunedAliasesUsedInGroupBy(a.groupings(), aggs, newAggs);
56-
plan = new Aggregate(a.source(), a.child(), a.aggregateType(), newGroups, newAggs);
56+
plan = new Aggregate(a.source(), a.child(), newGroups, newAggs);
5757
}
5858
}
5959
return plan;
@@ -78,7 +78,6 @@ protected LogicalPlan rule(UnaryPlan plan) {
7878
plan = new Aggregate(
7979
a.source(),
8080
p.child(),
81-
a.aggregateType(),
8281
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
8382
combineProjections(a.aggregates(), p.projections())
8483
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ExtractAggregateCommonFilter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ protected LogicalPlan rule(Aggregate aggregate) {
7070
return new Aggregate(
7171
aggregate.source(),
7272
new Filter(aggregate.source(), aggregate.child(), common.v1()),
73-
aggregate.aggregateType(),
7473
aggregate.groupings(),
7574
newAggs
7675
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,22 +73,10 @@ public LogicalPlan apply(LogicalPlan plan) {
7373
} else {
7474
// Aggs cannot produce pages with 0 columns, so retain one grouping.
7575
remaining = List.of(Expressions.attribute(aggregate.groupings().get(0)));
76-
p = new Aggregate(
77-
aggregate.source(),
78-
aggregate.child(),
79-
aggregate.aggregateType(),
80-
aggregate.groupings(),
81-
remaining
82-
);
76+
p = new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), remaining);
8377
}
8478
} else {
85-
p = new Aggregate(
86-
aggregate.source(),
87-
aggregate.child(),
88-
aggregate.aggregateType(),
89-
aggregate.groupings(),
90-
remaining
91-
);
79+
p = new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), remaining);
9280
}
9381
}
9482
} else if (p instanceof Eval eval) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SubstituteSurrogates.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected LogicalPlan rule(Aggregate aggregate) {
107107
if (changed) {
108108
var source = aggregate.source();
109109
if (newAggs.isEmpty() == false) {
110-
plan = new Aggregate(source, aggregate.child(), aggregate.aggregateType(), aggregate.groupings(), newAggs);
110+
plan = new Aggregate(source, aggregate.child(), aggregate.groupings(), newAggs);
111111
} else {
112112
// All aggs actually have been surrogates for (foldable) expressions, e.g.
113113
// \_Aggregate[[],[AVG([1, 2][INTEGER]) AS s]]

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -217,24 +217,13 @@ LogicalPlan translate(Aggregate aggregate) {
217217
return r;
218218
}
219219
});
220-
return newAggregate(
221-
newAggregate(relation, Aggregate.AggregateType.TIME_SERIES, firstPassAggs, firstPassGroupings),
222-
Aggregate.AggregateType.STANDARD,
223-
secondPassAggs,
224-
secondPassGroupings
225-
);
220+
return newAggregate(newAggregate(relation, firstPassAggs, firstPassGroupings), secondPassAggs, secondPassGroupings);
226221
}
227222

228-
private static Aggregate newAggregate(
229-
LogicalPlan child,
230-
Aggregate.AggregateType type,
231-
List<? extends NamedExpression> aggregates,
232-
List<Expression> groupings
233-
) {
223+
private static Aggregate newAggregate(LogicalPlan child, List<? extends NamedExpression> aggregates, List<Expression> groupings) {
234224
return new Aggregate(
235225
child.source(),
236226
child,
237-
type,
238227
groupings,
239228
Stream.concat(aggregates.stream(), groupings.stream().map(Expressions::attribute)).toList()
240229
);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public PlanFactory visitInsistCommand(EsqlBaseParser.InsistCommandContext ctx) {
313313
@Override
314314
public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
315315
final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats);
316-
return input -> new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, stats.groupings, stats.aggregates);
316+
return input -> new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
317317
}
318318

319319
private record Stats(List<Expression> groupings, List<? extends NamedExpression> aggregates) {}
@@ -363,10 +363,7 @@ public PlanFactory visitInlinestatsCommand(EsqlBaseParser.InlinestatsCommandCont
363363
List<NamedExpression> groupings = visitGrouping(ctx.grouping);
364364
aggregates.addAll(groupings);
365365
// TODO: add support for filters
366-
return input -> new InlineStats(
367-
source(ctx),
368-
new Aggregate(source(ctx), input, Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), aggregates)
369-
);
366+
return input -> new InlineStats(source(ctx), new Aggregate(source(ctx), input, new ArrayList<>(groupings), aggregates));
370367
}
371368

372369
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
2222
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
2323
import org.elasticsearch.xpack.esql.plan.logical.Project;
24+
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2425
import org.elasticsearch.xpack.esql.plan.logical.TopN;
2526
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
2627
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
4748
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
4849
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
50+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
4951
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
5052

5153
import java.util.ArrayList;
@@ -79,6 +81,7 @@ public static List<NamedWriteableRegistry.Entry> logical() {
7981
MvExpand.ENTRY,
8082
OrderBy.ENTRY,
8183
Project.ENTRY,
84+
TimeSeriesAggregateExec.ENTRY,
8285
TopN.ENTRY
8386
);
8487
}
@@ -105,6 +108,7 @@ public static List<NamedWriteableRegistry.Entry> physical() {
105108
ProjectExec.ENTRY,
106109
ShowExec.ENTRY,
107110
SubqueryExec.ENTRY,
111+
TimeSeriesAggregate.ENTRY,
108112
TopNExec.ENTRY
109113
);
110114
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 21 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,61 +48,35 @@ public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAwar
4848
Aggregate::new
4949
);
5050

51-
public enum AggregateType {
52-
STANDARD,
53-
TIME_SERIES;
54-
55-
static void writeType(StreamOutput out, AggregateType type) throws IOException {
56-
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
57-
out.writeString(type.name());
58-
} else if (type != STANDARD) {
59-
throw new IllegalStateException("cluster is not ready to support aggregate type [" + type + "]");
60-
}
61-
}
62-
63-
static AggregateType readType(StreamInput in) throws IOException {
64-
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
65-
return AggregateType.valueOf(in.readString());
66-
} else {
67-
return STANDARD;
68-
}
69-
}
70-
}
71-
72-
private final AggregateType aggregateType;
73-
private final List<Expression> groupings;
74-
private final List<? extends NamedExpression> aggregates;
51+
protected final List<Expression> groupings;
52+
protected final List<? extends NamedExpression> aggregates;
7553

76-
private List<Attribute> lazyOutput;
54+
protected List<Attribute> lazyOutput;
7755

78-
public Aggregate(
79-
Source source,
80-
LogicalPlan child,
81-
AggregateType aggregateType,
82-
List<Expression> groupings,
83-
List<? extends NamedExpression> aggregates
84-
) {
56+
public Aggregate(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
8557
super(source, child);
86-
this.aggregateType = aggregateType;
8758
this.groupings = groupings;
8859
this.aggregates = aggregates;
8960
}
9061

9162
public Aggregate(StreamInput in) throws IOException {
92-
this(
93-
Source.readFrom((PlanStreamInput) in),
94-
in.readNamedWriteable(LogicalPlan.class),
95-
AggregateType.readType(in),
96-
in.readNamedWriteableCollectionAsList(Expression.class),
97-
in.readNamedWriteableCollectionAsList(NamedExpression.class)
98-
);
63+
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class));
64+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)
65+
&& in.getTransportVersion().before(TransportVersions.ESQL_REMOVE_AGGREGATE_TYPE)) {
66+
in.readString();
67+
}
68+
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
69+
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
9970
}
10071

10172
@Override
10273
public void writeTo(StreamOutput out) throws IOException {
10374
Source.EMPTY.writeTo(out);
10475
out.writeNamedWriteable(child());
105-
AggregateType.writeType(out, aggregateType());
76+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)
77+
&& out.getTransportVersion().before(TransportVersions.ESQL_REMOVE_AGGREGATE_TYPE)) {
78+
out.writeString("STANDARD");
79+
}
10680
out.writeNamedWriteableCollection(groupings);
10781
out.writeNamedWriteableCollection(aggregates());
10882
}
@@ -114,24 +88,20 @@ public String getWriteableName() {
11488

11589
@Override
11690
protected NodeInfo<Aggregate> info() {
117-
return NodeInfo.create(this, Aggregate::new, child(), aggregateType, groupings, aggregates);
91+
return NodeInfo.create(this, Aggregate::new, child(), groupings, aggregates);
11892
}
11993

12094
@Override
12195
public Aggregate replaceChild(LogicalPlan newChild) {
122-
return new Aggregate(source(), newChild, aggregateType, groupings, aggregates);
96+
return new Aggregate(source(), newChild, groupings, aggregates);
12397
}
12498

12599
public Aggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
126100
return with(child(), newGroupings, newAggregates);
127101
}
128102

129103
public Aggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
130-
return new Aggregate(source(), child, aggregateType(), newGroupings, newAggregates);
131-
}
132-
133-
public AggregateType aggregateType() {
134-
return aggregateType;
104+
return new Aggregate(source(), child, newGroupings, newAggregates);
135105
}
136106

137107
public List<Expression> groupings() {
@@ -144,10 +114,7 @@ public List<? extends NamedExpression> aggregates() {
144114

145115
@Override
146116
public String telemetryLabel() {
147-
return switch (aggregateType) {
148-
case STANDARD -> "STATS";
149-
case TIME_SERIES -> "TIME_SERIES";
150-
};
117+
return "STATS";
151118
}
152119

153120
@Override
@@ -184,7 +151,7 @@ public static AttributeSet computeReferences(List<? extends NamedExpression> agg
184151

185152
@Override
186153
public int hashCode() {
187-
return Objects.hash(aggregateType, groupings, aggregates, child());
154+
return Objects.hash(groupings, aggregates, child());
188155
}
189156

190157
@Override
@@ -198,8 +165,7 @@ public boolean equals(Object obj) {
198165
}
199166

200167
Aggregate other = (Aggregate) obj;
201-
return aggregateType == other.aggregateType
202-
&& Objects.equals(groupings, other.groupings)
168+
return Objects.equals(groupings, other.groupings)
203169
&& Objects.equals(aggregates, other.aggregates)
204170
&& Objects.equals(child(), other.child());
205171
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Dedup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public UnaryPlan replaceChild(LogicalPlan newChild) {
7171

7272
@Override
7373
public LogicalPlan surrogate() {
74-
return new Aggregate(source(), child(), Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), finalAggs());
74+
return new Aggregate(source(), child(), new ArrayList<>(groupings), finalAggs());
7575
}
7676

7777
public List<NamedExpression> aggregates() {

0 commit comments

Comments
 (0)