Skip to content

Commit 97e4ff3

Browse files
authored
Add window to aggregation function (elastic#137344)
This change adds a window parameter to AggregateFunction, similar to the existing filter parameter. The window parameter is optional but must never be null. This PR also generalizes AggregateFunction to be composed of [source, field, filter, window, extra parameters] , with all extra parameters placed after filter and window. The implementation of the window function will be added in a follow-up and will initially be available only for time-series aggregations such as rate, avg_over_time, etc.
1 parent 292f65b commit 97e4ff3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+467
-459
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9208000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
search_shards_resolved_index_expressions,9207000
1+
aggregation_window,9208000

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,13 +1080,19 @@ private LogicalPlan resolveFuse(Fuse fuse, List<Attribute> childrenOutput) {
10801080
Expression aggFilter = new Literal(source, true, DataType.BOOLEAN);
10811081

10821082
List<NamedExpression> aggregates = new ArrayList<>();
1083-
aggregates.add(new Alias(source, score.name(), new Sum(source, score, aggFilter, SummationMode.COMPENSATED_LITERAL)));
1083+
aggregates.add(
1084+
new Alias(
1085+
source,
1086+
score.name(),
1087+
new Sum(source, score, aggFilter, AggregateFunction.NO_WINDOW, SummationMode.COMPENSATED_LITERAL)
1088+
)
1089+
);
10841090

10851091
for (Attribute attr : childrenOutput) {
10861092
if (attr.name().equals(score.name())) {
10871093
continue;
10881094
}
1089-
var valuesAgg = new Values(source, attr, aggFilter);
1095+
var valuesAgg = new Values(source, attr, aggFilter, AggregateFunction.NO_WINDOW);
10901096
// Use VALUES only on supported fields.
10911097
// FuseScoreEval will check that the input contains only columns with supported data types
10921098
// and will fail with an appropriate error message if it doesn't.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -531,11 +531,11 @@ private static FunctionDefinition[][] functions() {
531531
def(Score.class, uni(Score::new), "score") },
532532
// time-series functions
533533
new FunctionDefinition[] {
534-
defTS(Rate.class, Rate::new, "rate"),
535-
defTS(Irate.class, Irate::new, "irate"),
536-
defTS(Idelta.class, Idelta::new, "idelta"),
537-
defTS(Delta.class, Delta::new, "delta"),
538-
defTS(Increase.class, Increase::new, "increase"),
534+
defTS(Rate.class, bi(Rate::new), "rate"),
535+
defTS(Irate.class, bi(Irate::new), "irate"),
536+
defTS(Idelta.class, bi(Idelta::new), "idelta"),
537+
defTS(Delta.class, bi(Delta::new), "delta"),
538+
defTS(Increase.class, bi(Increase::new), "increase"),
539539
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
540540
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
541541
def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"),
@@ -546,8 +546,8 @@ private static FunctionDefinition[][] functions() {
546546
def(PresentOverTime.class, uni(PresentOverTime::new), "present_over_time"),
547547
def(AbsentOverTime.class, uni(AbsentOverTime::new), "absent_over_time"),
548548
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
549-
defTS(LastOverTime.class, LastOverTime::new, "last_over_time"),
550-
defTS(FirstOverTime.class, FirstOverTime::new, "first_over_time"),
549+
defTS(LastOverTime.class, bi(LastOverTime::new), "last_over_time"),
550+
defTS(FirstOverTime.class, bi(FirstOverTime::new), "first_over_time"),
551551
def(PercentileOverTime.class, bi(PercentileOverTime::new), "percentile_over_time"),
552552
// dense vector function
553553
def(TextEmbedding.class, bi(TextEmbedding::new), "text_embedding") } };

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Absent.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,11 @@ public Absent(
8484
description = "Expression that outputs values to be checked for absence."
8585
) Expression field
8686
) {
87-
this(source, field, Literal.TRUE);
87+
this(source, field, Literal.TRUE, NO_WINDOW);
8888
}
8989

90-
public Absent(Source source, Expression field, Expression filter) {
91-
super(source, field, filter, emptyList());
90+
public Absent(Source source, Expression field, Expression filter, Expression window) {
91+
super(source, field, filter, window, emptyList());
9292
}
9393

9494
private Absent(StreamInput in) throws IOException {
@@ -102,17 +102,17 @@ public String getWriteableName() {
102102

103103
@Override
104104
protected NodeInfo<Absent> info() {
105-
return NodeInfo.create(this, Absent::new, field(), filter());
105+
return NodeInfo.create(this, Absent::new, field(), filter(), window());
106106
}
107107

108108
@Override
109109
public AggregateFunction withFilter(Expression filter) {
110-
return new Absent(source(), field(), filter);
110+
return new Absent(source(), field(), filter, window());
111111
}
112112

113113
@Override
114114
public Absent replaceChildren(List<Expression> newChildren) {
115-
return new Absent(source(), newChildren.get(0), newChildren.get(1));
115+
return new Absent(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
116116
}
117117

118118
@Override
@@ -138,6 +138,6 @@ protected TypeResolution resolveType() {
138138

139139
@Override
140140
public Expression surrogate() {
141-
return new Not(source(), new Present(source(), field(), filter()));
141+
return new Not(source(), new Present(source(), field(), filter(), window()));
142142
}
143143
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AbsentOverTime.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import java.io.IOException;
2525
import java.util.List;
2626

27-
import static java.util.Collections.emptyList;
28-
2927
/**
3028
* Similar to {@link Absent}, but it is used to check the absence of values over a time series in the given field.
3129
*/
@@ -70,11 +68,11 @@ public AbsentOverTime(
7068
"version" }
7169
) Expression field
7270
) {
73-
this(source, field, Literal.TRUE);
71+
this(source, field, Literal.TRUE, NO_WINDOW);
7472
}
7573

76-
public AbsentOverTime(Source source, Expression field, Expression filter) {
77-
super(source, field, filter, emptyList());
74+
public AbsentOverTime(Source source, Expression field, Expression filter, Expression window) {
75+
super(source, field, filter, window, List.of());
7876
}
7977

8078
private AbsentOverTime(StreamInput in) throws IOException {
@@ -88,17 +86,17 @@ public String getWriteableName() {
8886

8987
@Override
9088
public AbsentOverTime withFilter(Expression filter) {
91-
return new AbsentOverTime(source(), field(), filter);
89+
return new AbsentOverTime(source(), field(), filter, window());
9290
}
9391

9492
@Override
9593
protected NodeInfo<AbsentOverTime> info() {
96-
return NodeInfo.create(this, AbsentOverTime::new, field(), filter());
94+
return NodeInfo.create(this, AbsentOverTime::new, field(), filter(), window());
9795
}
9896

9997
@Override
10098
public AbsentOverTime replaceChildren(List<Expression> newChildren) {
101-
return new AbsentOverTime(source(), newChildren.get(0), newChildren.get(1));
99+
return new AbsentOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
102100
}
103101

104102
@Override
@@ -113,6 +111,6 @@ public DataType dataType() {
113111

114112
@Override
115113
public Absent perTimeSeriesAggregation() {
116-
return new Absent(source(), field(), filter());
114+
return new Absent(source(), field(), filter(), window());
117115
}
118116
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.esql.expression.function.aggregate;
88

9+
import org.elasticsearch.TransportVersion;
910
import org.elasticsearch.common.io.stream.StreamInput;
1011
import org.elasticsearch.common.io.stream.StreamOutput;
1112
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
@@ -17,15 +18,14 @@
1718
import org.elasticsearch.xpack.esql.core.expression.Literal;
1819
import org.elasticsearch.xpack.esql.core.expression.TypeResolutions;
1920
import org.elasticsearch.xpack.esql.core.expression.function.Function;
20-
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2121
import org.elasticsearch.xpack.esql.core.tree.Source;
22-
import org.elasticsearch.xpack.esql.core.type.DataType;
2322
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
2423
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2524
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2625
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2726

2827
import java.io.IOException;
28+
import java.time.Duration;
2929
import java.util.List;
3030
import java.util.Objects;
3131
import java.util.function.BiConsumer;
@@ -38,25 +38,38 @@
3838

3939
/**
4040
* A type of {@code Function} that takes multiple values and extracts a single value out of them. For example, {@code AVG()}.
41+
* - Aggregate functions can have an optional filter and window, which default to {@code Literal.TRUE} and {@code NO_WINDOW}.
42+
* - The aggregation function should be composed as: source, field, filter, window, parameters.
43+
* Extra parameters should go to the parameters after the filter and window.
4144
*/
4245
public abstract class AggregateFunction extends Function implements PostAnalysisPlanVerificationAware {
46+
public static final Literal NO_WINDOW = Literal.timeDuration(Source.EMPTY, Duration.ZERO);
47+
public static final TransportVersion WINDOW_INTERVAL = TransportVersion.fromName("aggregation_window");
4348

4449
private final Expression field;
4550
private final List<? extends Expression> parameters;
4651
private final Expression filter;
52+
private final Expression window;
4753

4854
protected AggregateFunction(Source source, Expression field) {
49-
this(source, field, Literal.TRUE, emptyList());
55+
this(source, field, Literal.TRUE, NO_WINDOW, emptyList());
5056
}
5157

5258
protected AggregateFunction(Source source, Expression field, List<? extends Expression> parameters) {
53-
this(source, field, Literal.TRUE, parameters);
59+
this(source, field, Literal.TRUE, NO_WINDOW, parameters);
5460
}
5561

56-
protected AggregateFunction(Source source, Expression field, Expression filter, List<? extends Expression> parameters) {
57-
super(source, CollectionUtils.combine(asList(field, filter), parameters));
62+
protected AggregateFunction(
63+
Source source,
64+
Expression field,
65+
Expression filter,
66+
Expression window,
67+
List<? extends Expression> parameters
68+
) {
69+
super(source, CollectionUtils.combine(asList(field, filter, window), parameters));
5870
this.field = field;
5971
this.filter = filter;
72+
this.window = Objects.requireNonNull(window, "[window] must be specified; use NO_WINDOW instead");
6073
this.parameters = parameters;
6174
}
6275

@@ -65,48 +78,27 @@ protected AggregateFunction(StreamInput in) throws IOException {
6578
Source.readFrom((PlanStreamInput) in),
6679
in.readNamedWriteable(Expression.class),
6780
in.readNamedWriteable(Expression.class),
81+
readWindow(in),
6882
in.readNamedWriteableCollectionAsList(Expression.class)
6983
);
7084
}
7185

72-
/**
73-
* Read a generic AggregateFunction from the stream input. This is used for BWC when the subclass requires a generic instance;
74-
* then convert the parameters to the specific ones.
75-
*/
76-
protected static AggregateFunction readGenericAggregateFunction(StreamInput in) throws IOException {
77-
return new AggregateFunction(in) {
78-
@Override
79-
public AggregateFunction withFilter(Expression filter) {
80-
throw new UnsupportedOperationException();
81-
}
82-
83-
@Override
84-
public DataType dataType() {
85-
throw new UnsupportedOperationException();
86-
}
87-
88-
@Override
89-
public Expression replaceChildren(List<Expression> newChildren) {
90-
throw new UnsupportedOperationException();
91-
}
92-
93-
@Override
94-
protected NodeInfo<? extends Expression> info() {
95-
throw new UnsupportedOperationException();
96-
}
97-
98-
@Override
99-
public String getWriteableName() {
100-
throw new UnsupportedOperationException();
101-
}
102-
};
86+
protected static Expression readWindow(StreamInput in) throws IOException {
87+
if (in.getTransportVersion().supports(WINDOW_INTERVAL)) {
88+
return in.readNamedWriteable(Expression.class);
89+
} else {
90+
return NO_WINDOW;
91+
}
10392
}
10493

10594
@Override
10695
public final void writeTo(StreamOutput out) throws IOException {
10796
source().writeTo(out);
10897
out.writeNamedWriteable(field);
10998
out.writeNamedWriteable(filter);
99+
if (out.getTransportVersion().supports(WINDOW_INTERVAL)) {
100+
out.writeNamedWriteable(window);
101+
}
110102
out.writeNamedWriteableCollection(parameters);
111103
}
112104

@@ -144,6 +136,23 @@ public AggregateFunction withParameters(List<? extends Expression> parameters) {
144136
return (AggregateFunction) replaceChildren(CollectionUtils.combine(asList(field, filter), parameters));
145137
}
146138

139+
/**
140+
* Return the window associated with the aggregate function.
141+
*/
142+
public Expression window() {
143+
return window;
144+
}
145+
146+
/**
147+
* Whether the aggregate function has a window different than NO_WINDOW.
148+
*/
149+
public boolean hasWindow() {
150+
if (window instanceof Literal lit && lit.value() instanceof Duration duration) {
151+
return duration.isZero() == false;
152+
}
153+
return true;
154+
}
155+
147156
/**
148157
* Returns the set of input attributes required by this aggregate function, excluding those referenced by the filter.
149158
*/
@@ -168,6 +177,7 @@ public boolean equals(Object obj) {
168177
AggregateFunction other = (AggregateFunction) obj;
169178
return Objects.equals(other.field(), field())
170179
&& Objects.equals(other.filter(), filter())
180+
&& Objects.equals(other.window(), window())
171181
&& Objects.equals(other.parameters(), parameters());
172182
}
173183
return false;

0 commit comments

Comments
 (0)