Skip to content

Commit a7edabf

Browse files
committed
Made TopNAggregate a child of Aggregate
1 parent cc9fa02 commit a7edabf

File tree

8 files changed

+58
-58
lines changed

8 files changed

+58
-58
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_top_n.csv-spec

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,14 @@ height:double
4444
2.1
4545
2.1
4646
;
47+
48+
multipleTopN
49+
from employees
50+
| stats c = count(height) by languages.long
51+
| sort languages.long | limit 4
52+
| sort languages.long desc | limit 2;
53+
54+
c:long | languages.long:long
55+
17 | 3
56+
18 | 4
57+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceTopNAndAggregateWithTopNAggregate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public LogicalPlan apply(LogicalPlan plan) {
3030
}
3131

3232
private LogicalPlan applyRule(TopN topN) {
33-
if (topN.child() instanceof Aggregate aggregate) {
33+
if (topN.child() instanceof Aggregate aggregate && aggregate instanceof TopNAggregate == false) {
3434
// TimeSeriesAggregate shouldn't appear after a TopN when this rule is executed
3535
assert aggregate instanceof TimeSeriesAggregate == false : "TimeSeriesAggregate should not be replaced with TopNAggregate";
3636

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public PhysicalPlan apply(PhysicalPlan plan) {
7676
var logicalFragment = fragmentExec.fragment();
7777

7878
// no need for projection when dealing with aggs
79-
if (logicalFragment instanceof Aggregate == false && logicalFragment instanceof TopNAggregate == false) {
79+
if (logicalFragment instanceof Aggregate == false) {
8080
List<Attribute> output = new ArrayList<>(requiredAttrBuilder.build());
8181
// if all the fields are filtered out, it's only the count that matters
8282
// however until a proper fix (see https://github.com/elastic/elasticsearch/issues/98703)

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
1314
import org.elasticsearch.xpack.esql.core.expression.Expression;
1415
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1516
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
@@ -18,6 +19,7 @@
1819

1920
import java.io.IOException;
2021
import java.util.List;
22+
import java.util.Objects;
2123

2224
/**
2325
* An extension of {@link Aggregate} to perform time-series aggregation per time-series, such as rate or _over_time.
@@ -74,8 +76,35 @@ public TimeSeriesAggregate with(LogicalPlan child, List<Expression> newGroupings
7476
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket);
7577
}
7678

79+
@Override
80+
public boolean expressionsResolved() {
81+
return super.expressionsResolved() && timeBucket.resolved();
82+
}
83+
7784
@Nullable
7885
public Bucket timeBucket() {
7986
return timeBucket;
8087
}
88+
89+
@Override
90+
public int hashCode() {
91+
return Objects.hash(groupings, aggregates, child(), timeBucket);
92+
}
93+
94+
@Override
95+
public boolean equals(Object obj) {
96+
if (this == obj) {
97+
return true;
98+
}
99+
100+
if (obj == null || getClass() != obj.getClass()) {
101+
return false;
102+
}
103+
104+
TimeSeriesAggregate other = (TimeSeriesAggregate) obj;
105+
return Objects.equals(groupings, other.groupings)
106+
&& Objects.equals(aggregates, other.aggregates)
107+
&& Objects.equals(child(), other.child())
108+
&& Objects.equals(timeBucket, other.timeBucket);
109+
}
81110
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopNAggregate.java

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,13 @@
2323
import java.util.List;
2424
import java.util.Objects;
2525

26-
// TODO: Should this be TelemetryAware?
27-
public class TopNAggregate extends UnaryPlan {
26+
public class TopNAggregate extends Aggregate {
2827
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
2928
LogicalPlan.class,
3029
"TopNAggregate",
3130
TopNAggregate::new
3231
);
3332

34-
protected final List<Expression> groupings;
35-
protected final List<? extends NamedExpression> aggregates;
36-
3733
private final List<Order> order;
3834
private final Expression limit;
3935

@@ -47,27 +43,20 @@ public TopNAggregate(
4743
List<Order> order,
4844
Expression limit
4945
) {
50-
super(source, child);
51-
this.groupings = groupings;
52-
this.aggregates = aggregates;
46+
super(source, child, groupings, aggregates);
5347
this.order = order;
5448
this.limit = limit;
5549
}
5650

5751
public TopNAggregate(StreamInput in) throws IOException {
58-
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class));
59-
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
60-
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
52+
super(in);
6153
this.order = in.readCollectionAsList(Order::new);
6254
this.limit = in.readNamedWriteable(Expression.class);
6355
}
6456

6557
@Override
6658
public void writeTo(StreamOutput out) throws IOException {
67-
Source.EMPTY.writeTo(out);
68-
out.writeNamedWriteable(child());
69-
out.writeNamedWriteableCollection(groupings);
70-
out.writeNamedWriteableCollection(aggregates());
59+
super.writeTo(out);
7160
out.writeCollection(order);
7261
out.writeNamedWriteable(limit);
7362
}
@@ -87,14 +76,6 @@ public TopNAggregate replaceChild(LogicalPlan newChild) {
8776
return new TopNAggregate(source(), newChild, groupings, aggregates, order, limit);
8877
}
8978

90-
public List<Expression> groupings() {
91-
return groupings;
92-
}
93-
94-
public List<? extends NamedExpression> aggregates() {
95-
return aggregates;
96-
}
97-
9879
public List<Order> order() {
9980
return order;
10081
}
@@ -105,20 +86,7 @@ public Expression limit() {
10586

10687
@Override
10788
public boolean expressionsResolved() {
108-
return Resolvables.resolved(groupings) && Resolvables.resolved(aggregates);
109-
}
110-
111-
@Override
112-
public List<Attribute> output() {
113-
if (lazyOutput == null) {
114-
lazyOutput = Aggregate.output(aggregates);
115-
}
116-
return lazyOutput;
117-
}
118-
119-
@Override
120-
protected AttributeSet computeReferences() {
121-
return Aggregate.computeReferences(aggregates, groupings);
89+
return super.expressionsResolved() && Resolvables.resolved(order) && limit.resolved();
12290
}
12391

12492
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,16 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
7373
// Pipeline breakers
7474
//
7575

76-
if (unary instanceof Aggregate aggregate) {
77-
List<Attribute> intermediate = MapperUtils.intermediateAttributes(aggregate);
78-
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
79-
}
80-
8176
if (unary instanceof TopNAggregate topNAggregate) {
8277
List<Attribute> intermediate = MapperUtils.intermediateAttributes(topNAggregate);
8378
return MapperUtils.topNAggExec(topNAggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
8479
}
8580

81+
if (unary instanceof Aggregate aggregate) {
82+
List<Attribute> intermediate = MapperUtils.intermediateAttributes(aggregate);
83+
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
84+
}
85+
8686
if (unary instanceof Limit limit) {
8787
return new LimitExec(limit.source(), mappedChild, limit.limit(), null);
8888
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
146146
//
147147
// Pipeline breakers
148148
//
149-
if (unary instanceof Aggregate aggregate) {
149+
if (unary instanceof TopNAggregate aggregate) {
150150
List<Attribute> intermediate = MapperUtils.intermediateAttributes(aggregate);
151151

152152
// create both sides of the aggregate (for parallelism purposes), if no fragment is present
@@ -160,14 +160,14 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
160160
}
161161
// if no exchange was added (aggregation happening on the coordinator), create the initial agg
162162
else {
163-
mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
163+
mappedChild = MapperUtils.topNAggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
164164
}
165165

166166
// always add the final/reduction agg
167-
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
167+
return MapperUtils.topNAggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
168168
}
169169

170-
if (unary instanceof TopNAggregate aggregate) {
170+
if (unary instanceof Aggregate aggregate) {
171171
List<Attribute> intermediate = MapperUtils.intermediateAttributes(aggregate);
172172

173173
// create both sides of the aggregate (for parallelism purposes), if no fragment is present
@@ -181,11 +181,11 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
181181
}
182182
// if no exchange was added (aggregation happening on the coordinator), create the initial agg
183183
else {
184-
mappedChild = MapperUtils.topNAggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
184+
mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
185185
}
186186

187187
// always add the final/reduction agg
188-
return MapperUtils.topNAggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
188+
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
189189
}
190190

191191
if (unary instanceof Limit limit) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,6 @@ static List<Attribute> intermediateAttributes(Aggregate aggregate) {
152152
return intermediateAttributes;
153153
}
154154

155-
static List<Attribute> intermediateAttributes(TopNAggregate aggregate) {
156-
List<Attribute> intermediateAttributes = AbstractPhysicalOperationProviders.intermediateAttributes(
157-
aggregate.aggregates(),
158-
aggregate.groupings()
159-
);
160-
return intermediateAttributes;
161-
}
162-
163155
static AggregateExec aggExec(Aggregate aggregate, PhysicalPlan child, AggregatorMode aggMode, List<Attribute> intermediateAttributes) {
164156
if (aggregate instanceof TimeSeriesAggregate ts) {
165157
return new TimeSeriesAggregateExec(

0 commit comments

Comments
 (0)