Skip to content

Commit 208711b

Browse files
authored
ESQL: Move serialization for more Expressions (#109991)
This moves the serialization for aggregations and `BUCKET` to `NamedWriteable` to better align with the rest of Elasticsearch.
1 parent d9f18e8 commit 208711b

33 files changed

+818
-188
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66
*/
77
package org.elasticsearch.xpack.esql.expression.function.aggregate;
88

9+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
912
import org.elasticsearch.xpack.esql.core.expression.Expression;
1013
import org.elasticsearch.xpack.esql.core.expression.TypeResolutions;
1114
import org.elasticsearch.xpack.esql.core.expression.function.Function;
1215
import org.elasticsearch.xpack.esql.core.tree.Source;
1316
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
17+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
18+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
1419

20+
import java.io.IOException;
1521
import java.util.List;
1622
import java.util.Objects;
1723

@@ -23,6 +29,22 @@
2329
* A type of {@code Function} that takes multiple values and extracts a single value out of them. For example, {@code AVG()}.
2430
*/
2531
public abstract class AggregateFunction extends Function {
32+
public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
33+
return List.of(
34+
Avg.ENTRY,
35+
Count.ENTRY,
36+
CountDistinct.ENTRY,
37+
Max.ENTRY,
38+
Median.ENTRY,
39+
MedianAbsoluteDeviation.ENTRY,
40+
Min.ENTRY,
41+
Percentile.ENTRY,
42+
SpatialCentroid.ENTRY,
43+
Sum.ENTRY,
44+
TopList.ENTRY,
45+
Values.ENTRY
46+
);
47+
}
2648

2749
private final Expression field;
2850
private final List<? extends Expression> parameters;
@@ -37,6 +59,16 @@ protected AggregateFunction(Source source, Expression field, List<? extends Expr
3759
this.parameters = parameters;
3860
}
3961

62+
protected AggregateFunction(StreamInput in) throws IOException {
63+
this(Source.readFrom((PlanStreamInput) in), ((PlanStreamInput) in).readExpression());
64+
}
65+
66+
@Override
67+
public void writeTo(StreamOutput out) throws IOException {
68+
Source.EMPTY.writeTo(out);
69+
((PlanStreamOutput) out).writeExpression(field());
70+
}
71+
4072
public Expression field() {
4173
return field;
4274
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.xpack.esql.core.expression.Expression;
1113
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1214
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -17,12 +19,14 @@
1719
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvAvg;
1820
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div;
1921

22+
import java.io.IOException;
2023
import java.util.List;
2124

2225
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
2326
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
2427

2528
public class Avg extends AggregateFunction implements SurrogateExpression {
29+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Avg", Avg::new);
2630

2731
@FunctionInfo(returnType = "double", description = "The average of a numeric field.", isAggregation = true)
2832
public Avg(Source source, @Param(name = "number", type = { "double", "integer", "long" }) Expression field) {
@@ -40,6 +44,15 @@ protected Expression.TypeResolution resolveType() {
4044
);
4145
}
4246

47+
private Avg(StreamInput in) throws IOException {
48+
super(in);
49+
}
50+
51+
@Override
52+
public String getWriteableName() {
53+
return ENTRY.name;
54+
}
55+
4356
@Override
4457
public DataType dataType() {
4558
return DataType.DOUBLE;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
1113
import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
1214
import org.elasticsearch.xpack.esql.core.expression.Expression;
@@ -24,12 +26,14 @@
2426
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul;
2527
import org.elasticsearch.xpack.esql.planner.ToAggregator;
2628

29+
import java.io.IOException;
2730
import java.util.List;
2831

2932
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
3033
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
3134

3235
public class Count extends AggregateFunction implements EnclosedAgg, ToAggregator, SurrogateExpression {
36+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Count", Count::new);
3337

3438
@FunctionInfo(returnType = "long", description = "Returns the total number (count) of input values.", isAggregation = true)
3539
public Count(
@@ -56,6 +60,15 @@ public Count(
5660
super(source, field);
5761
}
5862

63+
private Count(StreamInput in) throws IOException {
64+
super(in);
65+
}
66+
67+
@Override
68+
public String getWriteableName() {
69+
return ENTRY.name;
70+
}
71+
5972
@Override
6073
protected NodeInfo<Count> info() {
6174
return NodeInfo.create(this, Count::new, field());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1013
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
1114
import org.elasticsearch.compute.aggregation.CountDistinctBooleanAggregatorFunctionSupplier;
1215
import org.elasticsearch.compute.aggregation.CountDistinctBytesRefAggregatorFunctionSupplier;
@@ -28,8 +31,11 @@
2831
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvCount;
2932
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvDedupe;
3033
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
34+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
35+
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
3136
import org.elasticsearch.xpack.esql.planner.ToAggregator;
3237

38+
import java.io.IOException;
3339
import java.util.List;
3440

3541
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
@@ -39,6 +45,12 @@
3945
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
4046

4147
public class CountDistinct extends AggregateFunction implements OptionalArgument, ToAggregator, SurrogateExpression {
48+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
49+
Expression.class,
50+
"CountDistinct",
51+
CountDistinct::new
52+
);
53+
4254
private static final int DEFAULT_PRECISION = 3000;
4355
private final Expression precision;
4456

@@ -56,6 +68,26 @@ public CountDistinct(
5668
this.precision = precision;
5769
}
5870

71+
private CountDistinct(StreamInput in) throws IOException {
72+
this(
73+
Source.readFrom((PlanStreamInput) in),
74+
((PlanStreamInput) in).readExpression(),
75+
in.readOptionalWriteable(i -> ((PlanStreamInput) i).readExpression())
76+
);
77+
}
78+
79+
@Override
80+
public void writeTo(StreamOutput out) throws IOException {
81+
Source.EMPTY.writeTo(out);
82+
((PlanStreamOutput) out).writeExpression(field());
83+
((PlanStreamOutput) out).writeOptionalExpression(precision);
84+
}
85+
86+
@Override
87+
public String getWriteableName() {
88+
return ENTRY.name;
89+
}
90+
5991
@Override
6092
protected NodeInfo<CountDistinct> info() {
6193
return NodeInfo.create(this, CountDistinct::new, field(), precision);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
1113
import org.elasticsearch.compute.aggregation.MaxDoubleAggregatorFunctionSupplier;
1214
import org.elasticsearch.compute.aggregation.MaxIntAggregatorFunctionSupplier;
@@ -20,9 +22,11 @@
2022
import org.elasticsearch.xpack.esql.expression.function.Param;
2123
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMax;
2224

25+
import java.io.IOException;
2326
import java.util.List;
2427

2528
public class Max extends NumericAggregate implements SurrogateExpression {
29+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Max", Max::new);
2630

2731
@FunctionInfo(
2832
returnType = { "double", "integer", "long", "date" },
@@ -33,6 +37,15 @@ public Max(Source source, @Param(name = "number", type = { "double", "integer",
3337
super(source, field);
3438
}
3539

40+
private Max(StreamInput in) throws IOException {
41+
super(in);
42+
}
43+
44+
@Override
45+
public String getWriteableName() {
46+
return ENTRY.name;
47+
}
48+
3649
@Override
3750
protected NodeInfo<Max> info() {
3851
return NodeInfo.create(this, Max::new, field());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.compute.aggregation.QuantileStates;
1113
import org.elasticsearch.xpack.esql.core.expression.Expression;
1214
import org.elasticsearch.xpack.esql.core.expression.Literal;
@@ -19,12 +21,15 @@
1921
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDouble;
2022
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
2123

24+
import java.io.IOException;
2225
import java.util.List;
2326

2427
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
2528
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
2629

2730
public class Median extends AggregateFunction implements SurrogateExpression {
31+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Median", Median::new);
32+
2833
// TODO: Add the compression parameter
2934
@FunctionInfo(
3035
returnType = { "double", "integer", "long" },
@@ -46,6 +51,15 @@ protected Expression.TypeResolution resolveType() {
4651
);
4752
}
4853

54+
private Median(StreamInput in) throws IOException {
55+
super(in);
56+
}
57+
58+
@Override
59+
public String getWriteableName() {
60+
return ENTRY.name;
61+
}
62+
4963
@Override
5064
public DataType dataType() {
5165
return DataType.DOUBLE;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MedianAbsoluteDeviation.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
1113
import org.elasticsearch.compute.aggregation.MedianAbsoluteDeviationDoubleAggregatorFunctionSupplier;
1214
import org.elasticsearch.compute.aggregation.MedianAbsoluteDeviationIntAggregatorFunctionSupplier;
@@ -17,9 +19,15 @@
1719
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
1820
import org.elasticsearch.xpack.esql.expression.function.Param;
1921

22+
import java.io.IOException;
2023
import java.util.List;
2124

2225
public class MedianAbsoluteDeviation extends NumericAggregate {
26+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
27+
Expression.class,
28+
"MedianAbsoluteDeviation",
29+
MedianAbsoluteDeviation::new
30+
);
2331

2432
// TODO: Add parameter
2533
@FunctionInfo(
@@ -31,6 +39,15 @@ public MedianAbsoluteDeviation(Source source, @Param(name = "number", type = { "
3139
super(source, field);
3240
}
3341

42+
private MedianAbsoluteDeviation(StreamInput in) throws IOException {
43+
super(in);
44+
}
45+
46+
@Override
47+
public String getWriteableName() {
48+
return ENTRY.name;
49+
}
50+
3451
@Override
3552
protected NodeInfo<MedianAbsoluteDeviation> info() {
3653
return NodeInfo.create(this, MedianAbsoluteDeviation::new, field());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Min.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.expression.function.aggregate;
99

10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
1012
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
1113
import org.elasticsearch.compute.aggregation.MinDoubleAggregatorFunctionSupplier;
1214
import org.elasticsearch.compute.aggregation.MinIntAggregatorFunctionSupplier;
@@ -20,9 +22,11 @@
2022
import org.elasticsearch.xpack.esql.expression.function.Param;
2123
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
2224

25+
import java.io.IOException;
2326
import java.util.List;
2427

2528
public class Min extends NumericAggregate implements SurrogateExpression {
29+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Min", Min::new);
2630

2731
@FunctionInfo(
2832
returnType = { "double", "integer", "long", "date" },
@@ -33,6 +37,15 @@ public Min(Source source, @Param(name = "number", type = { "double", "integer",
3337
super(source, field);
3438
}
3539

40+
private Min(StreamInput in) throws IOException {
41+
super(in);
42+
}
43+
44+
@Override
45+
public String getWriteableName() {
46+
return ENTRY.name;
47+
}
48+
3649
@Override
3750
protected NodeInfo<Min> info() {
3851
return NodeInfo.create(this, Min::new, field());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.esql.expression.function.aggregate;
88

9+
import org.elasticsearch.common.io.stream.StreamInput;
910
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
1011
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
1112
import org.elasticsearch.xpack.esql.core.expression.Expression;
@@ -14,6 +15,7 @@
1415
import org.elasticsearch.xpack.esql.core.type.DataType;
1516
import org.elasticsearch.xpack.esql.planner.ToAggregator;
1617

18+
import java.io.IOException;
1719
import java.util.List;
1820

1921
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
@@ -51,6 +53,10 @@ public abstract class NumericAggregate extends AggregateFunction implements ToAg
5153
super(source, field);
5254
}
5355

56+
NumericAggregate(StreamInput in) throws IOException {
57+
super(in);
58+
}
59+
5460
@Override
5561
protected TypeResolution resolveType() {
5662
if (supportsDates()) {

0 commit comments

Comments
 (0)