-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Add window to aggregation function #137344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
3aed50d
a182a72
f47041d
3ec11b9
c06417d
76b0767
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| 9205000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| esql_resolve_fields_response_used,9204000 | ||
| aggregation_window,9205000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| */ | ||
| package org.elasticsearch.xpack.esql.expression.function.aggregate; | ||
|
|
||
| import org.elasticsearch.TransportVersion; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; | ||
|
|
@@ -17,15 +18,14 @@ | |
| import org.elasticsearch.xpack.esql.core.expression.Literal; | ||
| import org.elasticsearch.xpack.esql.core.expression.TypeResolutions; | ||
| import org.elasticsearch.xpack.esql.core.expression.function.Function; | ||
| import org.elasticsearch.xpack.esql.core.tree.NodeInfo; | ||
| import org.elasticsearch.xpack.esql.core.tree.Source; | ||
| import org.elasticsearch.xpack.esql.core.type.DataType; | ||
| import org.elasticsearch.xpack.esql.core.util.CollectionUtils; | ||
| import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Aggregate; | ||
| import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; | ||
|
|
||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.function.BiConsumer; | ||
|
|
@@ -38,25 +38,38 @@ | |
|
|
||
| /** | ||
| * A type of {@code Function} that takes multiple values and extracts a single value out of them. For example, {@code AVG()}. | ||
| * - Aggregate functions can have an optional filter and window, which default to {@code Literal.TRUE} and {@code NO_WINDOW}. | ||
| * - The aggregation function should be composed as: source, field, filter, window, parameters. | ||
| * Extra parameters should go to the parameters after the filter and window. | ||
| */ | ||
| public abstract class AggregateFunction extends Function implements PostAnalysisPlanVerificationAware { | ||
| public static final Literal NO_WINDOW = Literal.timeDuration(Source.EMPTY, Duration.ZERO); | ||
| public static final TransportVersion WINDOW_INTERVAL = TransportVersion.fromName("aggregation_window"); | ||
|
|
||
| private final Expression field; | ||
| private final List<? extends Expression> parameters; | ||
| private final Expression filter; | ||
| private final Expression window; | ||
|
|
||
| protected AggregateFunction(Source source, Expression field) { | ||
| this(source, field, Literal.TRUE, emptyList()); | ||
| this(source, field, Literal.TRUE, NO_WINDOW, emptyList()); | ||
| } | ||
|
|
||
| protected AggregateFunction(Source source, Expression field, List<? extends Expression> parameters) { | ||
| this(source, field, Literal.TRUE, parameters); | ||
| this(source, field, Literal.TRUE, NO_WINDOW, parameters); | ||
| } | ||
|
|
||
| protected AggregateFunction(Source source, Expression field, Expression filter, List<? extends Expression> parameters) { | ||
| super(source, CollectionUtils.combine(asList(field, filter), parameters)); | ||
| protected AggregateFunction( | ||
| Source source, | ||
| Expression field, | ||
| Expression filter, | ||
| Expression window, | ||
| List<? extends Expression> parameters | ||
| ) { | ||
| super(source, CollectionUtils.combine(asList(field, filter, window), parameters)); | ||
| this.field = field; | ||
| this.filter = filter; | ||
| this.window = Objects.requireNonNull(window, "[window] must be specified; use NO_WINDOW instead"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we expect functions to check against negative durations?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we should, but our random tests may pass any expression for this parameter. |
||
| this.parameters = parameters; | ||
| } | ||
|
|
||
|
|
@@ -65,48 +78,27 @@ protected AggregateFunction(StreamInput in) throws IOException { | |
| Source.readFrom((PlanStreamInput) in), | ||
| in.readNamedWriteable(Expression.class), | ||
| in.readNamedWriteable(Expression.class), | ||
| readWindow(in), | ||
| in.readNamedWriteableCollectionAsList(Expression.class) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Read a generic AggregateFunction from the stream input. This is used for BWC when the subclass requires a generic instance; | ||
| * then convert the parameters to the specific ones. | ||
| */ | ||
| protected static AggregateFunction readGenericAggregateFunction(StreamInput in) throws IOException { | ||
| return new AggregateFunction(in) { | ||
| @Override | ||
| public AggregateFunction withFilter(Expression filter) { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| public DataType dataType() { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| public Expression replaceChildren(List<Expression> newChildren) { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| protected NodeInfo<? extends Expression> info() { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| public String getWriteableName() { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
| }; | ||
| protected static Expression readWindow(StreamInput in) throws IOException { | ||
| if (in.getTransportVersion().supports(WINDOW_INTERVAL)) { | ||
| return in.readNamedWriteable(Expression.class); | ||
| } else { | ||
| return NO_WINDOW; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public final void writeTo(StreamOutput out) throws IOException { | ||
| source().writeTo(out); | ||
| out.writeNamedWriteable(field); | ||
| out.writeNamedWriteable(filter); | ||
| if (out.getTransportVersion().supports(WINDOW_INTERVAL)) { | ||
| out.writeNamedWriteable(window); | ||
| } | ||
| out.writeNamedWriteableCollection(parameters); | ||
| } | ||
|
|
||
|
|
@@ -144,6 +136,21 @@ public AggregateFunction withParameters(List<? extends Expression> parameters) { | |
| return (AggregateFunction) replaceChildren(CollectionUtils.combine(asList(field, filter), parameters)); | ||
| } | ||
|
|
||
| /** | ||
| * Return the window associated with the aggregate function. | ||
| */ | ||
| public Expression window() { | ||
| return window; | ||
| } | ||
|
|
||
| /** | ||
| * Whether the aggregate function has a window different than NO_WINDOW. | ||
| */ | ||
| public boolean hasWindow() { | ||
| boolean zero = window instanceof Literal lit && lit.value() instanceof Duration duration && duration.isZero(); | ||
|
||
| return zero == false; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the set of input attributes required by this aggregate function, excluding those referenced by the filter. | ||
| */ | ||
|
|
@@ -168,6 +175,7 @@ public boolean equals(Object obj) { | |
| AggregateFunction other = (AggregateFunction) obj; | ||
| return Objects.equals(other.field(), field()) | ||
| && Objects.equals(other.filter(), filter()) | ||
| && Objects.equals(other.window(), window()) | ||
| && Objects.equals(other.parameters(), parameters()); | ||
| } | ||
| return false; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.