Skip to content

Commit 3916c28

Browse files
committed
Initial test with attributes within Aggregate
1 parent ebcab5f commit 3916c28

File tree

8 files changed

+203
-19
lines changed

8 files changed

+203
-19
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ static TransportVersion def(int id) {
273273
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED = def(9_084_0_00);
274274
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
275275
public static final TransportVersion ESQL_REGEX_MATCH_WITH_CASE_INSENSITIVITY = def(9_086_0_00);
276+
public static final TransportVersion ESQL_TOP_N_AGGREGATES = def(9_087_0_00);
276277

277278
/*
278279
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEvalFoldables;
2929
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateInlineEvals;
3030
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateNullable;
31+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateTopNToAggregates;
3132
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropgateUnmappedFields;
3233
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns;
3334
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans;
@@ -208,6 +209,6 @@ protected static Batch<LogicalPlan> operators() {
208209
}
209210

210211
protected static Batch<LogicalPlan> cleanup() {
211-
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new ReplaceRowAsLocalRelation(), new PropgateUnmappedFields());
212+
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new PropagateTopNToAggregates(), new ReplaceRowAsLocalRelation(), new PropgateUnmappedFields());
212213
}
213214
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
13+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateTopNToAggregates;
1314
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1415
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1516
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
9+
10+
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
11+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
12+
import org.elasticsearch.xpack.esql.plan.logical.TopN;
13+
import org.elasticsearch.xpack.esql.rule.Rule;
14+
15+
/**
16+
* Looks for the structure:
17+
* <pre>
18+
* TopN
19+
* \_Aggregate
20+
* </pre>
21+
* And replaces the Aggregate with an Aggregate with the TopN data.
22+
* (TODO: Create a new TopNAggregate node instead)
23+
*/
24+
public class PropagateTopNToAggregates extends Rule<TopN, LogicalPlan> {
25+
26+
@Override
27+
public LogicalPlan apply(LogicalPlan plan) {
28+
return plan.transformUp(
29+
TopN.class,
30+
this::applyRule
31+
);
32+
}
33+
34+
private TopN applyRule(TopN topN) {
35+
// TODO: Handle TimeSeriesAggregate
36+
if (topN.child() instanceof Aggregate aggregate) {
37+
return topN.replaceChild(
38+
new Aggregate(
39+
aggregate.source(),
40+
aggregate.child(),
41+
aggregate.groupings(),
42+
aggregate.aggregates(),
43+
topN.order(),
44+
topN.limit()
45+
)
46+
);
47+
}
48+
return topN;
49+
}
50+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.core.Nullable;
1314
import org.elasticsearch.index.IndexMode;
1415
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
1516
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
@@ -27,6 +28,7 @@
2728
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2829
import org.elasticsearch.xpack.esql.core.tree.Source;
2930
import org.elasticsearch.xpack.esql.core.util.Holder;
31+
import org.elasticsearch.xpack.esql.expression.Order;
3032
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
3133
import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression;
3234
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
@@ -55,12 +57,32 @@ public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAwar
5557
protected final List<Expression> groupings;
5658
protected final List<? extends NamedExpression> aggregates;
5759

60+
private final List<Order> order;
61+
@Nullable
62+
private final Expression limit;
63+
5864
protected List<Attribute> lazyOutput;
5965

6066
public Aggregate(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
6167
super(source, child);
6268
this.groupings = groupings;
6369
this.aggregates = aggregates;
70+
this.order = List.of();
71+
this.limit = null;
72+
}
73+
74+
public Aggregate(
75+
Source source,
76+
LogicalPlan child,
77+
List<Expression> groupings, List<? extends NamedExpression> aggregates,
78+
List<Order> order,
79+
@Nullable Expression limit
80+
) {
81+
super(source, child);
82+
this.groupings = groupings;
83+
this.aggregates = aggregates;
84+
this.order = order;
85+
this.limit = limit;
6486
}
6587

6688
public Aggregate(StreamInput in) throws IOException {
@@ -71,6 +93,13 @@ public Aggregate(StreamInput in) throws IOException {
7193
}
7294
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
7395
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
96+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOP_N_AGGREGATES)) {
97+
this.order = in.readCollectionAsList(Order::new);
98+
this.limit = in.readOptionalNamedWriteable(Expression.class);
99+
} else {
100+
this.order = emptyList();
101+
this.limit = null;
102+
}
74103
}
75104

76105
@Override
@@ -83,6 +112,10 @@ public void writeTo(StreamOutput out) throws IOException {
83112
}
84113
out.writeNamedWriteableCollection(groupings);
85114
out.writeNamedWriteableCollection(aggregates());
115+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOP_N_AGGREGATES)) {
116+
out.writeCollection(order);
117+
out.writeOptionalNamedWriteable(limit);
118+
}
86119
}
87120

88121
@Override
@@ -97,15 +130,15 @@ protected NodeInfo<? extends Aggregate> info() {
97130

98131
@Override
99132
public Aggregate replaceChild(LogicalPlan newChild) {
100-
return new Aggregate(source(), newChild, groupings, aggregates);
133+
return new Aggregate(source(), newChild, groupings, aggregates, order, limit);
101134
}
102135

103136
public Aggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
104137
return with(child(), newGroupings, newAggregates);
105138
}
106139

107140
public Aggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
108-
return new Aggregate(source(), child, newGroupings, newAggregates);
141+
return new Aggregate(source(), child, newGroupings, newAggregates, order, limit);
109142
}
110143

111144
public List<Expression> groupings() {
@@ -116,6 +149,14 @@ public List<? extends NamedExpression> aggregates() {
116149
return aggregates;
117150
}
118151

152+
public List<Order> order() {
153+
return order;
154+
}
155+
156+
public Expression limit() {
157+
return limit;
158+
}
159+
119160
@Override
120161
public String telemetryLabel() {
121162
return "STATS";

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,27 @@
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
1414
import org.elasticsearch.compute.aggregation.AggregatorMode;
15+
import org.elasticsearch.core.Nullable;
1516
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1617
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1718
import org.elasticsearch.xpack.esql.core.expression.Expression;
1819
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1920
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2021
import org.elasticsearch.xpack.esql.core.tree.Source;
22+
import org.elasticsearch.xpack.esql.expression.Order;
2123
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
2224
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2325
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
26+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2427

2528
import java.io.IOException;
2629
import java.util.ArrayList;
2730
import java.util.HashSet;
2831
import java.util.List;
2932
import java.util.Objects;
3033

34+
import static java.util.Collections.emptyList;
35+
3136
public class AggregateExec extends UnaryExec implements EstimatesRowSize {
3237
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
3338
PhysicalPlan.class,
@@ -51,6 +56,10 @@ public class AggregateExec extends UnaryExec implements EstimatesRowSize {
5156
*/
5257
private final Integer estimatedRowSize;
5358

59+
private final List<Order> order;
60+
@Nullable
61+
private final Expression limit;
62+
5463
public AggregateExec(
5564
Source source,
5665
PhysicalPlan child,
@@ -66,20 +75,47 @@ public AggregateExec(
6675
this.mode = mode;
6776
this.intermediateAttributes = intermediateAttributes;
6877
this.estimatedRowSize = estimatedRowSize;
78+
this.order = List.of();
79+
this.limit = null;
80+
}
81+
82+
public AggregateExec(
83+
Source source,
84+
PhysicalPlan child,
85+
List<? extends Expression> groupings,
86+
List<? extends NamedExpression> aggregates,
87+
AggregatorMode mode,
88+
List<Attribute> intermediateAttributes,
89+
Integer estimatedRowSize,
90+
List<Order> order,
91+
@Nullable Expression limit
92+
) {
93+
super(source, child);
94+
this.groupings = groupings;
95+
this.aggregates = aggregates;
96+
this.mode = mode;
97+
this.intermediateAttributes = intermediateAttributes;
98+
this.estimatedRowSize = estimatedRowSize;
99+
this.order = order;
100+
this.limit = limit;
69101
}
70102

71103
protected AggregateExec(StreamInput in) throws IOException {
72104
// This is only deserialized as part of node level reduction, which is turned off until at least 8.16.
73105
// So, we do not have to consider previous transport versions here, because old nodes will not send AggregateExecs to new nodes.
74-
this(
75-
Source.readFrom((PlanStreamInput) in),
76-
in.readNamedWriteable(PhysicalPlan.class),
77-
in.readNamedWriteableCollectionAsList(Expression.class),
78-
in.readNamedWriteableCollectionAsList(NamedExpression.class),
79-
in.readEnum(AggregatorMode.class),
80-
in.readNamedWriteableCollectionAsList(Attribute.class),
81-
in.readOptionalVInt()
82-
);
106+
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class));
107+
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
108+
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
109+
this.mode = in.readEnum(AggregatorMode.class);
110+
this.intermediateAttributes = in.readNamedWriteableCollectionAsList(Attribute.class);
111+
this.estimatedRowSize = in.readOptionalVInt();
112+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOP_N_AGGREGATES)) {
113+
this.order = in.readCollectionAsList(Order::new);
114+
this.limit = in.readOptionalNamedWriteable(Expression.class);
115+
} else {
116+
this.order = emptyList();
117+
this.limit = null;
118+
}
83119
}
84120

85121
@Override
@@ -95,6 +131,10 @@ public void writeTo(StreamOutput out) throws IOException {
95131
out.writeEnum(AggregateExec.Mode.fromAggregatorMode(getMode()));
96132
}
97133
out.writeOptionalVInt(estimatedRowSize());
134+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOP_N_AGGREGATES)) {
135+
out.writeCollection(order);
136+
out.writeOptionalNamedWriteable(limit);
137+
}
98138
}
99139

100140
@Override
@@ -109,7 +149,7 @@ protected NodeInfo<AggregateExec> info() {
109149

110150
@Override
111151
public AggregateExec replaceChild(PhysicalPlan newChild) {
112-
return new AggregateExec(source(), newChild, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize);
152+
return new AggregateExec(source(), newChild, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, order, limit);
113153
}
114154

115155
public List<? extends Expression> groupings() {
@@ -120,8 +160,16 @@ public List<? extends NamedExpression> aggregates() {
120160
return aggregates;
121161
}
122162

163+
public List<Order> order() {
164+
return order;
165+
}
166+
167+
public Expression limit() {
168+
return limit;
169+
}
170+
123171
public AggregateExec withMode(AggregatorMode newMode) {
124-
return new AggregateExec(source(), child(), groupings, aggregates, newMode, intermediateAttributes, estimatedRowSize);
172+
return new AggregateExec(source(), child(), groupings, aggregates, newMode, intermediateAttributes, estimatedRowSize, order, limit);
125173
}
126174

127175
/**
@@ -141,7 +189,7 @@ public PhysicalPlan estimateRowSize(State state) {
141189
}
142190

143191
protected AggregateExec withEstimatedSize(int estimatedRowSize) {
144-
return new AggregateExec(source(), child(), groupings, aggregates, mode, intermediateAttributes, estimatedRowSize);
192+
return new AggregateExec(source(), child(), groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, order, limit);
145193
}
146194

147195
public AggregatorMode getMode() {

0 commit comments

Comments
 (0)