Skip to content

Commit acd9d21

Browse files
committed
ToAggregateMetricDouble function
1 parent 5112dbb commit acd9d21

File tree

8 files changed

+241
-6
lines changed

8 files changed

+241
-6
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ static TransportVersion def(int id) {
181181
public static final TransportVersion RETRY_ILM_ASYNC_ACTION_REQUIRE_ERROR = def(9_025_0_00);
182182
public static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = def(9_026_0_00);
183183
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
184+
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_028_0_00);
184185

185186
/*
186187
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,6 @@ public static boolean isRepresentable(DataType t) {
556556
&& t != SOURCE
557557
&& t != HALF_FLOAT
558558
&& t != PARTIAL_AGG
559-
&& t != AGGREGATE_METRIC_DOUBLE
560559
&& t.isCounter() == false;
561560
}
562561

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.elasticsearch.TransportVersion;
11+
import org.elasticsearch.TransportVersions;
12+
import org.elasticsearch.common.io.stream.GenericNamedWriteable;
13+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
1016
import org.elasticsearch.core.Releasables;
1117
import org.elasticsearch.index.mapper.BlockLoader;
1218

19+
import java.io.IOException;
20+
1321
public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
1422

1523
private DoubleBlockBuilder minBuilder;
@@ -161,11 +169,40 @@ public String getLabel() {
161169
}
162170
}
163171

164-
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
172+
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable {
165173
public AggregateMetricDoubleLiteral {
166174
min = min.isNaN() ? null : min;
167175
max = max.isNaN() ? null : max;
168176
sum = sum.isNaN() ? null : sum;
169177
}
178+
179+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
180+
GenericNamedWriteable.class,
181+
"AggregateMetricDoubleLiteral",
182+
AggregateMetricDoubleLiteral::new
183+
);
184+
185+
@Override
186+
public String getWriteableName() {
187+
return "AggregateMetricDoubleLiteral";
188+
}
189+
190+
public AggregateMetricDoubleLiteral(StreamInput input) throws IOException {
191+
this(input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalInt());
192+
}
193+
194+
@Override
195+
public void writeTo(StreamOutput out) throws IOException {
196+
out.writeOptionalDouble(min);
197+
out.writeOptionalDouble(max);
198+
out.writeOptionalDouble(sum);
199+
out.writeOptionalInt(count);
200+
}
201+
202+
@Override
203+
public TransportVersion getMinimalSupportedVersion() {
204+
return TransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL;
205+
}
206+
170207
}
171208
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.common.regex.Regex;
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.util.BigArrays;
23+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
2324
import org.elasticsearch.compute.data.BlockFactory;
2425
import org.elasticsearch.compute.data.BlockUtils;
2526
import org.elasticsearch.compute.data.BytesRefBlock;
@@ -788,6 +789,12 @@ public static Literal randomLiteral(DataType type) {
788789
case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint());
789790
case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean()));
790791
case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean()));
792+
case AGGREGATE_METRIC_DOUBLE -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
793+
randomDouble(),
794+
randomDouble(),
795+
randomDouble(),
796+
randomInt()
797+
);
791798
case NULL -> null;
792799
case SOURCE -> {
793800
try {
@@ -798,8 +805,9 @@ public static Literal randomLiteral(DataType type) {
798805
throw new UncheckedIOException(e);
799806
}
800807
}
801-
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE ->
802-
throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]");
808+
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException(
809+
"can't make random values for [" + type.typeName() + "]"
810+
);
803811
}, type);
804812
}
805813

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Greatest;
4343
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least;
4444
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
45+
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
4546
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
4647
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
4748
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
@@ -376,6 +377,7 @@ private static FunctionDefinition[][] functions() {
376377
// conversion functions
377378
new FunctionDefinition[] {
378379
def(FromBase64.class, FromBase64::new, "from_base64"),
380+
def(ToAggregateMetricDouble.class, ToAggregateMetricDouble::new, "to_aggregate_metric_double", "to_aggregatemetricdouble"),
379381
def(ToBase64.class, ToBase64::new, "to_base64"),
380382
def(ToBoolean.class, ToBoolean::new, "to_boolean", "to_bool"),
381383
def(ToCartesianPoint.class, ToCartesianPoint::new, "to_cartesianpoint"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
2+
3+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
4+
import org.elasticsearch.common.io.stream.StreamInput;
5+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
6+
import org.elasticsearch.compute.data.Block;
7+
import org.elasticsearch.compute.data.CompositeBlock;
8+
import org.elasticsearch.compute.data.DoubleBlock;
9+
import org.elasticsearch.compute.data.Page;
10+
import org.elasticsearch.compute.operator.DriverContext;
11+
import org.elasticsearch.compute.operator.EvalOperator;
12+
import org.elasticsearch.core.Releasables;
13+
import org.elasticsearch.xpack.esql.core.expression.Expression;
14+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
15+
import org.elasticsearch.xpack.esql.core.tree.Source;
16+
import org.elasticsearch.xpack.esql.core.type.DataType;
17+
import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
18+
import org.elasticsearch.xpack.esql.expression.function.Example;
19+
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
20+
import org.elasticsearch.xpack.esql.expression.function.Param;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
27+
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
28+
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
29+
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
30+
31+
public class ToAggregateMetricDouble extends AbstractConvertFunction implements SurrogateExpression {
32+
33+
private static final Map<DataType, AbstractConvertFunction.BuildFactory> EVALUATORS = Map.ofEntries(
34+
Map.entry(AGGREGATE_METRIC_DOUBLE, (field, source) -> field),
35+
Map.entry(DOUBLE, Factory::new)
36+
);
37+
38+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
39+
Expression.class,
40+
"ToAggregateMetricDouble",
41+
ToAggregateMetricDouble::new
42+
);
43+
44+
@FunctionInfo(
45+
returnType = "aggregate_metric_double",
46+
description = "Encode a numeric to an aggregate_metric_double.",
47+
examples = @Example(file = "??????", tag = "to_aggregate_metric_double")
48+
)
49+
public ToAggregateMetricDouble(
50+
Source source,
51+
@Param(
52+
name = "number",
53+
type = { "double", "long", "unsigned_long", "integer", "aggregate_metric_double" },
54+
// TODO: vvvv ???????? multi-valued?????
55+
description = "Input value. The input can be a single- or multi-valued column or an expression."
56+
) Expression field
57+
) {
58+
super(source, field);
59+
}
60+
61+
private ToAggregateMetricDouble(StreamInput in) throws IOException {
62+
super(in);
63+
}
64+
65+
@Override
66+
public String getWriteableName() {
67+
return ENTRY.name;
68+
}
69+
70+
@Override
71+
protected TypeResolution resolveType() {
72+
if (childrenResolved() == false) {
73+
return new TypeResolution("Unresolved children");
74+
}
75+
return isType(
76+
field,
77+
dt -> dt == DataType.AGGREGATE_METRIC_DOUBLE || dt == DataType.DOUBLE || dt == DataType.LONG || dt == DataType.INTEGER,
78+
sourceText(),
79+
DEFAULT,
80+
"aggregate_metric_double only"
81+
);
82+
}
83+
84+
@Override
85+
public DataType dataType() {
86+
return AGGREGATE_METRIC_DOUBLE;
87+
}
88+
89+
@Override
90+
public Expression replaceChildren(List<Expression> newChildren) {
91+
return new ToAggregateMetricDouble(source(), newChildren.get(0));
92+
}
93+
94+
@Override
95+
protected NodeInfo<? extends Expression> info() {
96+
return NodeInfo.create(this, ToAggregateMetricDouble::new, field);
97+
}
98+
99+
@Override
100+
protected Map<DataType, AbstractConvertFunction.BuildFactory> factories() {
101+
return EVALUATORS;
102+
}
103+
104+
public static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
105+
private final Source source;
106+
107+
private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
108+
109+
public Factory(EvalOperator.ExpressionEvaluator.Factory fieldEvaluator, Source source) {
110+
this.fieldEvaluator = fieldEvaluator;
111+
this.source = source;
112+
}
113+
114+
@Override
115+
public String toString() {
116+
return "ToAggregateMetricDoubleEvaluator[" + "field=" + fieldEvaluator + "]";
117+
}
118+
119+
@Override
120+
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
121+
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
122+
123+
return new EvalOperator.ExpressionEvaluator() {
124+
@Override
125+
public Block eval(Page page) {
126+
Block block = eval.eval(page);
127+
if (block instanceof CompositeBlock) {
128+
return block;
129+
}
130+
int positionCount = page.getPositionCount();
131+
try {
132+
DoubleBlock doubleBlock = (DoubleBlock) block;
133+
try (
134+
AggregateMetricDoubleBlockBuilder result = context.blockFactory()
135+
.newAggregateMetricDoubleBlockBuilder(positionCount)
136+
) {
137+
for (int p = 0; p < positionCount; p++) {
138+
try {
139+
var val = doubleBlock.getDouble(p);
140+
// todo: what if this fails midway....
141+
result.min().appendDouble(val);
142+
result.max().appendDouble(val);
143+
result.sum().appendDouble(val);
144+
result.count().appendInt(1);
145+
} catch (Exception e) {
146+
result.appendNull();
147+
}
148+
}
149+
return result.build();
150+
}
151+
} finally {
152+
block.close();
153+
}
154+
}
155+
156+
@Override
157+
public void close() {
158+
Releasables.closeExpectNoException(eval);
159+
}
160+
161+
@Override
162+
public String toString() {
163+
return "FromAggregateMetricDoubleEvaluator[field=" + eval + "]";
164+
}
165+
};
166+
}
167+
}
168+
169+
@Override
170+
public EvalOperator.ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
171+
return new Factory(toEvaluator.apply(field), source());
172+
}
173+
174+
@Override
175+
public Expression surrogate() {
176+
var s = source();
177+
var field = field();
178+
if (field.dataType() == AGGREGATE_METRIC_DOUBLE) {
179+
return field;
180+
}
181+
if (field.dataType() == DOUBLE) {
182+
return null;
183+
}
184+
return new ToAggregateMetricDouble(s, new ToDouble(s, field));
185+
}
186+
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
1213
import org.elasticsearch.search.SearchModule;
1314
import org.elasticsearch.xpack.esql.core.tree.Node;
1415
import org.elasticsearch.xpack.esql.expression.ExpressionWritables;
@@ -51,6 +52,7 @@ protected final NamedWriteableRegistry getNamedWriteableRegistry() {
5152
entries.addAll(ExpressionWritables.allExpressions());
5253
entries.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables()); // Query builders
5354
entries.add(Add.ENTRY); // Used by the eval tests
55+
entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY);
5456
return new NamedWriteableRegistry(entries);
5557
}
5658

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ setup:
9393
- gt: {esql.functions.to_long: $functions_to_long}
9494
- match: {esql.functions.coalesce: $functions_coalesce}
9595
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
96-
- length: {esql.functions: 133} # check the "sister" test below for a likely update to the same esql.functions length check
96+
- length: {esql.functions: 134} # check the "sister" test below for a likely update to the same esql.functions length check
9797

9898
---
9999
"Basic ESQL usage output (telemetry) non-snapshot version":
@@ -164,4 +164,4 @@ setup:
164164
- match: {esql.functions.cos: $functions_cos}
165165
- gt: {esql.functions.to_long: $functions_to_long}
166166
- match: {esql.functions.coalesce: $functions_coalesce}
167-
- length: {esql.functions: 130} # check the "sister" test above for a likely update to the same esql.functions length check
167+
- length: {esql.functions: 131} # check the "sister" test above for a likely update to the same esql.functions length check

0 commit comments

Comments
 (0)