Skip to content

Commit 2af99c4

Browse files
committed
ToAggregateMetricDouble function
1 parent 5112dbb commit 2af99c4

File tree

8 files changed

+248
-6
lines changed

8 files changed

+248
-6
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ static TransportVersion def(int id) {
181181
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
182182
public static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = def(9_026_0_00);
183183
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
184+
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_028_0_00);
184185

185186
/*
186187
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,6 @@ public static boolean isRepresentable(DataType t) {
556556
&& t != SOURCE
557557
&& t != HALF_FLOAT
558558
&& t != PARTIAL_AGG
559-
&& t != AGGREGATE_METRIC_DOUBLE
560559
&& t.isCounter() == false;
561560
}
562561

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.TransportVersions;
12+
import org.elasticsearch.common.io.stream.GenericNamedWriteable;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
1016
import org.elasticsearch.core.Releasables;
1117
import org.elasticsearch.index.mapper.BlockLoader;
1218

19+
import java.io.IOException;
20+
1321
public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
1422

1523
private DoubleBlockBuilder minBuilder;
@@ -161,11 +169,40 @@ public String getLabel() {
161169
}
162170
}
163171

164-
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
172+
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable {
165173
public AggregateMetricDoubleLiteral {
166174
min = min.isNaN() ? null : min;
167175
max = max.isNaN() ? null : max;
168176
sum = sum.isNaN() ? null : sum;
169177
}
178+
179+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
180+
GenericNamedWriteable.class,
181+
"AggregateMetricDoubleLiteral",
182+
AggregateMetricDoubleLiteral::new
183+
);
184+
185+
@Override
186+
public String getWriteableName() {
187+
return "AggregateMetricDoubleLiteral";
188+
}
189+
190+
public AggregateMetricDoubleLiteral(StreamInput input) throws IOException {
191+
this(input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalInt());
192+
}
193+
194+
@Override
195+
public void writeTo(StreamOutput out) throws IOException {
196+
out.writeOptionalDouble(min);
197+
out.writeOptionalDouble(max);
198+
out.writeOptionalDouble(sum);
199+
out.writeOptionalInt(count);
200+
}
201+
202+
@Override
203+
public TransportVersion getMinimalSupportedVersion() {
204+
return TransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL;
205+
}
206+
170207
}
171208
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.regex.Regex;
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.util.BigArrays;
23+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
2324
import org.elasticsearch.compute.data.BlockFactory;
2425
import org.elasticsearch.compute.data.BlockUtils;
2526
import org.elasticsearch.compute.data.BytesRefBlock;
@@ -788,6 +789,12 @@ public static Literal randomLiteral(DataType type) {
788789
case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint());
789790
case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean()));
790791
case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean()));
792+
case AGGREGATE_METRIC_DOUBLE -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
793+
randomDouble(),
794+
randomDouble(),
795+
randomDouble(),
796+
randomInt()
797+
);
791798
case NULL -> null;
792799
case SOURCE -> {
793800
try {
@@ -798,8 +805,9 @@ public static Literal randomLiteral(DataType type) {
798805
throw new UncheckedIOException(e);
799806
}
800807
}
801-
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE ->
802-
throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]");
808+
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException(
809+
"can't make random values for [" + type.typeName() + "]"
810+
);
803811
}, type);
804812
}
805813

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Greatest;
4343
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least;
4444
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
45+
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
4546
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
4647
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
4748
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
@@ -376,6 +377,7 @@ private static FunctionDefinition[][] functions() {
376377
// conversion functions
377378
new FunctionDefinition[] {
378379
def(FromBase64.class, FromBase64::new, "from_base64"),
380+
def(ToAggregateMetricDouble.class, ToAggregateMetricDouble::new, "to_aggregate_metric_double", "to_aggregatemetricdouble"),
379381
def(ToBase64.class, ToBase64::new, "to_base64"),
380382
def(ToBoolean.class, ToBoolean::new, "to_boolean", "to_bool"),
381383
def(ToCartesianPoint.class, ToCartesianPoint::new, "to_cartesianpoint"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
13+
import org.elasticsearch.compute.data.Block;
14+
import org.elasticsearch.compute.data.CompositeBlock;
15+
import org.elasticsearch.compute.data.DoubleBlock;
16+
import org.elasticsearch.compute.data.Page;
17+
import org.elasticsearch.compute.operator.DriverContext;
18+
import org.elasticsearch.compute.operator.EvalOperator;
19+
import org.elasticsearch.core.Releasables;
20+
import org.elasticsearch.xpack.esql.core.expression.Expression;
21+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
22+
import org.elasticsearch.xpack.esql.core.tree.Source;
23+
import org.elasticsearch.xpack.esql.core.type.DataType;
24+
import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
25+
import org.elasticsearch.xpack.esql.expression.function.Example;
26+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
27+
import org.elasticsearch.xpack.esql.expression.function.Param;
28+
29+
import java.io.IOException;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
34+
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
35+
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
36+
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
37+
38+
public class ToAggregateMetricDouble extends AbstractConvertFunction implements SurrogateExpression {
39+
40+
private static final Map<DataType, AbstractConvertFunction.BuildFactory> EVALUATORS = Map.ofEntries(
41+
Map.entry(AGGREGATE_METRIC_DOUBLE, (field, source) -> field),
42+
Map.entry(DOUBLE, Factory::new)
43+
);
44+
45+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
46+
Expression.class,
47+
"ToAggregateMetricDouble",
48+
ToAggregateMetricDouble::new
49+
);
50+
51+
@FunctionInfo(
52+
returnType = "aggregate_metric_double",
53+
description = "Encode a numeric to an aggregate_metric_double.",
54+
examples = @Example(file = "??????", tag = "to_aggregate_metric_double")
55+
)
56+
public ToAggregateMetricDouble(
57+
Source source,
58+
@Param(
59+
name = "number",
60+
type = { "double", "long", "unsigned_long", "integer", "aggregate_metric_double" },
61+
// TODO: vvvv ???????? multi-valued?????
62+
description = "Input value. The input can be a single- or multi-valued column or an expression."
63+
) Expression field
64+
) {
65+
super(source, field);
66+
}
67+
68+
private ToAggregateMetricDouble(StreamInput in) throws IOException {
69+
super(in);
70+
}
71+
72+
@Override
73+
public String getWriteableName() {
74+
return ENTRY.name;
75+
}
76+
77+
@Override
78+
protected TypeResolution resolveType() {
79+
if (childrenResolved() == false) {
80+
return new TypeResolution("Unresolved children");
81+
}
82+
return isType(
83+
field,
84+
dt -> dt == DataType.AGGREGATE_METRIC_DOUBLE || dt == DataType.DOUBLE || dt == DataType.LONG || dt == DataType.INTEGER,
85+
sourceText(),
86+
DEFAULT,
87+
"aggregate_metric_double only"
88+
);
89+
}
90+
91+
@Override
92+
public DataType dataType() {
93+
return AGGREGATE_METRIC_DOUBLE;
94+
}
95+
96+
@Override
97+
public Expression replaceChildren(List<Expression> newChildren) {
98+
return new ToAggregateMetricDouble(source(), newChildren.get(0));
99+
}
100+
101+
@Override
102+
protected NodeInfo<? extends Expression> info() {
103+
return NodeInfo.create(this, ToAggregateMetricDouble::new, field);
104+
}
105+
106+
@Override
107+
protected Map<DataType, AbstractConvertFunction.BuildFactory> factories() {
108+
return EVALUATORS;
109+
}
110+
111+
public static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
112+
private final Source source;
113+
114+
private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
115+
116+
public Factory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) {
117+
this.fieldEvaluator = fieldEvaluator;
118+
this.source = source;
119+
}
120+
121+
@Override
122+
public String toString() {
123+
return "ToAggregateMetricDoubleEvaluator[" + "field=" + fieldEvaluator + "]";
124+
}
125+
126+
@Override
127+
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
128+
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
129+
130+
return new EvalOperator.ExpressionEvaluator() {
131+
@Override
132+
public Block eval(Page page) {
133+
Block block = eval.eval(page);
134+
if (block instanceof CompositeBlock) {
135+
return block;
136+
}
137+
int positionCount = page.getPositionCount();
138+
try {
139+
DoubleBlock doubleBlock = (DoubleBlock) block;
140+
try (
141+
AggregateMetricDoubleBlockBuilder result = context.blockFactory()
142+
.newAggregateMetricDoubleBlockBuilder(positionCount)
143+
) {
144+
for (int p = 0; p < positionCount; p++) {
145+
try {
146+
var val = doubleBlock.getDouble(p);
147+
// todo: what if this fails midway....
148+
result.min().appendDouble(val);
149+
result.max().appendDouble(val);
150+
result.sum().appendDouble(val);
151+
result.count().appendInt(1);
152+
} catch (Exception e) {
153+
result.appendNull();
154+
}
155+
}
156+
return result.build();
157+
}
158+
} finally {
159+
block.close();
160+
}
161+
}
162+
163+
@Override
164+
public void close() {
165+
Releasables.closeExpectNoException(eval);
166+
}
167+
168+
@Override
169+
public String toString() {
170+
return "FromAggregateMetricDoubleEvaluator[field=" + eval + "]";
171+
}
172+
};
173+
}
174+
}
175+
176+
@Override
177+
public EvalOperator.ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
178+
return new Factory(toEvaluator.apply(field), source());
179+
}
180+
181+
@Override
182+
public Expression surrogate() {
183+
var s = source();
184+
var field = field();
185+
if (field.dataType() == AGGREGATE_METRIC_DOUBLE) {
186+
return field;
187+
}
188+
if (field.dataType() == DOUBLE) {
189+
return null;
190+
}
191+
return new ToAggregateMetricDouble(s, new ToDouble(s, field));
192+
}
193+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
1213
import org.elasticsearch.search.SearchModule;
1314
import org.elasticsearch.xpack.esql.core.tree.Node;
1415
import org.elasticsearch.xpack.esql.expression.ExpressionWritables;
@@ -51,6 +52,7 @@ protected final NamedWriteableRegistry getNamedWriteableRegistry() {
5152
entries.addAll(ExpressionWritables.allExpressions());
5253
entries.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables()); // Query builders
5354
entries.add(Add.ENTRY); // Used by the eval tests
55+
entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY);
5456
return new NamedWriteableRegistry(entries);
5557
}
5658

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ setup:
9393
- gt: {esql.functions.to_long: $functions_to_long}
9494
- match: {esql.functions.coalesce: $functions_coalesce}
9595
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
96-
- length: {esql.functions: 133} # check the "sister" test below for a likely update to the same esql.functions length check
96+
- length: {esql.functions: 134} # check the "sister" test below for a likely update to the same esql.functions length check
9797

9898
---
9999
"Basic ESQL usage output (telemetry) non-snapshot version":
@@ -164,4 +164,4 @@ setup:
164164
- match: {esql.functions.cos: $functions_cos}
165165
- gt: {esql.functions.to_long: $functions_to_long}
166166
- match: {esql.functions.coalesce: $functions_coalesce}
167-
- length: {esql.functions: 130} # check the "sister" test above for a likely update to the same esql.functions length check
167+
- length: {esql.functions: 131} # check the "sister" test above for a likely update to the same esql.functions length check

0 commit comments

Comments
 (0)