diff --git a/docs/changelog/124595.yaml b/docs/changelog/124595.yaml
new file mode 100644
index 0000000000000..7afd0541b6231
--- /dev/null
+++ b/docs/changelog/124595.yaml
@@ -0,0 +1,5 @@
+pr: 124595
+summary: '`ToAggregateMetricDouble` function'
+area: "ES|QL"
+type: enhancement
+issues: []
diff --git a/docs/changelog/125191.yaml b/docs/changelog/125191.yaml
new file mode 100644
index 0000000000000..ced55c2d2ecc6
--- /dev/null
+++ b/docs/changelog/125191.yaml
@@ -0,0 +1,5 @@
+pr: 125191
+summary: Fix sorting when `aggregate_metric_double` present
+area: ES|QL
+type: enhancement
+issues: []
diff --git a/docs/reference/esql/functions/description/to_aggregate_metric_double.asciidoc b/docs/reference/esql/functions/description/to_aggregate_metric_double.asciidoc
new file mode 100644
index 0000000000000..5f7f3e6d30d27
--- /dev/null
+++ b/docs/reference/esql/functions/description/to_aggregate_metric_double.asciidoc
@@ -0,0 +1,5 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Description*
+
+Encode a numeric to an aggregate_metric_double.
diff --git a/docs/reference/esql/functions/examples/to_aggregate_metric_double.asciidoc b/docs/reference/esql/functions/examples/to_aggregate_metric_double.asciidoc
new file mode 100644
index 0000000000000..6b386d5266f9b
--- /dev/null
+++ b/docs/reference/esql/functions/examples/to_aggregate_metric_double.asciidoc
@@ -0,0 +1,22 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Examples*
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/convert.csv-spec[tag=toAggregateMetricDouble]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/convert.csv-spec[tag=toAggregateMetricDouble-result]
+|===
+The expression also accepts multi-values
+[source.merge.styled,esql]
+----
+include::{esql-specs}/convert.csv-spec[tag=toAggregateMetricDoubleMv]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/convert.csv-spec[tag=toAggregateMetricDoubleMv-result]
+|===
+
diff --git a/docs/reference/esql/functions/kibana/definition/to_aggregate_metric_double.json b/docs/reference/esql/functions/kibana/definition/to_aggregate_metric_double.json
new file mode 100644
index 0000000000000..a072e72e87147
--- /dev/null
+++ b/docs/reference/esql/functions/kibana/definition/to_aggregate_metric_double.json
@@ -0,0 +1,13 @@
+{
+ "comment" : "This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.",
+ "type" : "eval",
+ "name" : "to_aggregate_metric_double",
+ "description" : "Encode a numeric to an aggregate_metric_double.",
+ "signatures" : [ ],
+ "examples" : [
+ "ROW x = 3892095203\n| EVAL agg_metric = TO_AGGREGATE_METRIC_DOUBLE(x)",
+ "ROW x = [5032, 11111, 40814]\n| EVAL agg_metric = TO_AGGREGATE_METRIC_DOUBLE(x)"
+ ],
+ "preview" : false,
+ "snapshot_only" : false
+}
diff --git a/docs/reference/esql/functions/kibana/docs/to_aggregate_metric_double.md b/docs/reference/esql/functions/kibana/docs/to_aggregate_metric_double.md
new file mode 100644
index 0000000000000..d900e85414865
--- /dev/null
+++ b/docs/reference/esql/functions/kibana/docs/to_aggregate_metric_double.md
@@ -0,0 +1,11 @@
+
+
+### TO_AGGREGATE_METRIC_DOUBLE
+Encode a numeric to an aggregate_metric_double.
+
+```
+ROW x = 3892095203
+| EVAL agg_metric = TO_AGGREGATE_METRIC_DOUBLE(x)
+```
diff --git a/docs/reference/esql/functions/layout/to_aggregate_metric_double.asciidoc b/docs/reference/esql/functions/layout/to_aggregate_metric_double.asciidoc
new file mode 100644
index 0000000000000..748410c7693b8
--- /dev/null
+++ b/docs/reference/esql/functions/layout/to_aggregate_metric_double.asciidoc
@@ -0,0 +1,15 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+[discrete]
+[[esql-to_aggregate_metric_double]]
+=== `TO_AGGREGATE_METRIC_DOUBLE`
+
+*Syntax*
+
+[.text-center]
+image::esql/functions/signature/to_aggregate_metric_double.svg[Embedded,opts=inline]
+
+include::../parameters/to_aggregate_metric_double.asciidoc[]
+include::../description/to_aggregate_metric_double.asciidoc[]
+include::../types/to_aggregate_metric_double.asciidoc[]
+include::../examples/to_aggregate_metric_double.asciidoc[]
diff --git a/docs/reference/esql/functions/parameters/to_aggregate_metric_double.asciidoc b/docs/reference/esql/functions/parameters/to_aggregate_metric_double.asciidoc
new file mode 100644
index 0000000000000..68958de7e66df
--- /dev/null
+++ b/docs/reference/esql/functions/parameters/to_aggregate_metric_double.asciidoc
@@ -0,0 +1,6 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Parameters*
+
+`number`::
+Input value. The input can be a single- or multi-valued column or an expression.
diff --git a/docs/reference/esql/functions/signature/to_aggregate_metric_double.svg b/docs/reference/esql/functions/signature/to_aggregate_metric_double.svg
new file mode 100644
index 0000000000000..e2ad820cf6fb9
--- /dev/null
+++ b/docs/reference/esql/functions/signature/to_aggregate_metric_double.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/reference/esql/functions/types/to_aggregate_metric_double.asciidoc b/docs/reference/esql/functions/types/to_aggregate_metric_double.asciidoc
new file mode 100644
index 0000000000000..1be687c9c4884
--- /dev/null
+++ b/docs/reference/esql/functions/types/to_aggregate_metric_double.asciidoc
@@ -0,0 +1,9 @@
+// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
+
+*Supported types*
+
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+number | result
+aggregate_metric_double
+|===
diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java
index 4fdc32ea41e12..277d3fbeb8ca3 100644
--- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java
+++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java
@@ -557,7 +557,6 @@ public static boolean isRepresentable(DataType t) {
&& t != SOURCE
&& t != HALF_FLOAT
&& t != PARTIAL_AGG
- && t != AGGREGATE_METRIC_DOUBLE
&& t.isCounter() == false;
}
@@ -578,7 +577,7 @@ public static boolean isSpatial(DataType t) {
}
public static boolean isSortable(DataType t) {
- return false == (t == SOURCE || isCounter(t) || isSpatial(t));
+ return false == (t == SOURCE || isCounter(t) || isSpatial(t) || t == AGGREGATE_METRIC_DOUBLE);
}
public String nameUpper() {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java
index 4f1c6faf520be..ffb897854904d 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java
@@ -7,9 +7,17 @@
package org.elasticsearch.compute.data;
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.common.io.stream.GenericNamedWriteable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;
+import java.io.IOException;
+
public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
private DoubleBlockBuilder minBuilder;
@@ -161,11 +169,40 @@ public String getLabel() {
}
}
- public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
+ public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable {
public AggregateMetricDoubleLiteral {
min = min.isNaN() ? null : min;
max = max.isNaN() ? null : max;
sum = sum.isNaN() ? null : sum;
}
+
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ GenericNamedWriteable.class,
+ "AggregateMetricDoubleLiteral",
+ AggregateMetricDoubleLiteral::new
+ );
+
+ @Override
+ public String getWriteableName() {
+ return "AggregateMetricDoubleLiteral";
+ }
+
+ public AggregateMetricDoubleLiteral(StreamInput input) throws IOException {
+ this(input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalInt());
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeOptionalDouble(min);
+ out.writeOptionalDouble(max);
+ out.writeOptionalDouble(sum);
+ out.writeOptionalInt(count);
+ }
+
+ @Override
+ public TransportVersion getMinimalSupportedVersion() {
+ return TransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL;
+ }
+
}
}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java
index 8773a3b9785e0..1d6012a8a73de 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java
@@ -285,7 +285,19 @@ private static Object valueAtOffset(Block block, int offset) {
DocVector v = ((DocBlock) block).asVector();
yield new Doc(v.shards().getInt(offset), v.segments().getInt(offset), v.docs().getInt(offset));
}
- case COMPOSITE -> throw new IllegalArgumentException("can't read values from composite blocks");
+ case COMPOSITE -> {
+ CompositeBlock compositeBlock = (CompositeBlock) block;
+ var minBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
+ var maxBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
+ var sumBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
+ var countBlock = (IntBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
+ yield new AggregateMetricDoubleLiteral(
+ minBlock.getDouble(offset),
+ maxBlock.getDouble(offset),
+ sumBlock.getDouble(offset),
+ countBlock.getInt(offset)
+ );
+ }
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java
index 61c49bac7505d..a1e6cd17fd625 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java
@@ -54,6 +54,7 @@ static ResultBuilder resultBuilderFor(
case DOUBLE -> new ResultBuilderForDouble(blockFactory, encoder, inKey, positions);
case NULL -> new ResultBuilderForNull(blockFactory);
case DOC -> new ResultBuilderForDoc(blockFactory, positions);
+ case COMPOSITE -> new ResultBuilderForComposite(blockFactory, positions);
default -> {
assert false : "Result builder for [" + elementType + "]";
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForComposite.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForComposite.java
new file mode 100644
index 0000000000000..c9844f94988c7
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForComposite.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator.topn;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.index.mapper.BlockLoader;
+
+import java.util.List;
+
+public class ResultBuilderForComposite implements ResultBuilder {
+
+ private final AggregateMetricDoubleBlockBuilder builder;
+
+ ResultBuilderForComposite(BlockFactory blockFactory, int positions) {
+ this.builder = blockFactory.newAggregateMetricDoubleBlockBuilder(positions);
+ }
+
+ @Override
+ public void decodeKey(BytesRef keys) {
+ throw new AssertionError("Composite Block can't be a key");
+ }
+
+ @Override
+ public void decodeValue(BytesRef values) {
+ for (BlockLoader.DoubleBuilder subBuilder : List.of(builder.min(), builder.max(), builder.sum())) {
+ if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) {
+ subBuilder.appendDouble(TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(values));
+ } else {
+ subBuilder.appendNull();
+ }
+ }
+ if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) {
+ builder.count().appendInt(TopNEncoder.DEFAULT_UNSORTABLE.decodeInt(values));
+ } else {
+ builder.count().appendNull();
+ }
+ }
+
+ @Override
+ public Block build() {
+ return builder.build();
+ }
+
+ @Override
+ public String toString() {
+ return "ValueExtractorForComposite";
+ }
+
+ @Override
+ public void close() {
+ builder.close();
+ }
+}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java
index 40c94ff1327f0..7d1e30f432a5a 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java
@@ -10,6 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
@@ -40,6 +41,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
case DOUBLE -> ValueExtractorForDouble.extractorFor(encoder, inKey, (DoubleBlock) block);
case NULL -> new ValueExtractorForNull();
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
+ case COMPOSITE -> new ValueExtractorForComposite(encoder, (CompositeBlock) block);
default -> {
assert false : "No value extractor for [" + block.elementType() + "]";
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java
new file mode 100644
index 0000000000000..da58deb96d632
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator.topn;
+
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
+import org.elasticsearch.compute.data.CompositeBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
+
+import java.util.List;
+
+public class ValueExtractorForComposite implements ValueExtractor {
+ private final CompositeBlock block;
+
+ ValueExtractorForComposite(TopNEncoder encoder, CompositeBlock block) {
+ assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
+ this.block = block;
+ }
+
+ @Override
+ public void writeValue(BreakingBytesRefBuilder values, int position) {
+ if (block.getBlockCount() != AggregateMetricDoubleBlockBuilder.Metric.values().length) {
+ throw new UnsupportedOperationException("Composite Blocks for non-aggregate-metric-doubles do not have value extractors");
+ }
+ for (AggregateMetricDoubleBlockBuilder.Metric metric : List.of(
+ AggregateMetricDoubleBlockBuilder.Metric.MIN,
+ AggregateMetricDoubleBlockBuilder.Metric.MAX,
+ AggregateMetricDoubleBlockBuilder.Metric.SUM
+ )) {
+ DoubleBlock doubleBlock = block.getBlock(metric.getIndex());
+ if (doubleBlock.isNull(position)) {
+ TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values);
+ } else {
+ TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(true, values);
+ TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(doubleBlock.getDouble(position), values);
+ }
+ }
+ IntBlock intBlock = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
+ if (intBlock.isNull(position)) {
+ TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values);
+ } else {
+ TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(true, values);
+ TopNEncoder.DEFAULT_UNSORTABLE.encodeInt(intBlock.getInt(position), values);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ValueExtractorForComposite";
+ }
+}
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java
index 4e31916d5328e..bc485f1df9297 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java
@@ -10,6 +10,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.DocValueFormat;
@@ -40,6 +41,7 @@
import static org.elasticsearch.xpack.esql.core.util.NumericUtils.unsignedLongAsNumber;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.CARTESIAN;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;
+import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.aggregateMetricDoubleLiteralToString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
@@ -405,6 +407,11 @@ private static Object convertExpectedValue(Type expectedType, Object expectedVal
case VERSION -> // convert BytesRef-packed Version to String
rebuildExpected(expectedValue, BytesRef.class, x -> new Version((BytesRef) x).toString());
case UNSIGNED_LONG -> rebuildExpected(expectedValue, Long.class, x -> unsignedLongAsNumber((long) x));
+ case AGGREGATE_METRIC_DOUBLE -> rebuildExpected(
+ expectedValue,
+ AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.class,
+ x -> aggregateMetricDoubleLiteralToString((AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral) x)
+ );
default -> expectedValue;
};
}
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java
index 47c927398f95e..24db9f6931672 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java
@@ -15,6 +15,7 @@
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
@@ -62,6 +63,7 @@
import static org.elasticsearch.xpack.esql.core.util.NumericUtils.asLongUnsigned;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.CARTESIAN;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;
+import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.stringToAggregateMetricDoubleLiteral;
public final class CsvTestUtils {
private static final int MAX_WIDTH = 80;
@@ -480,6 +482,10 @@ public enum Type {
CARTESIAN_POINT(x -> x == null ? null : CARTESIAN.wktToWkb(x), BytesRef.class),
GEO_SHAPE(x -> x == null ? null : GEO.wktToWkb(x), BytesRef.class),
CARTESIAN_SHAPE(x -> x == null ? null : CARTESIAN.wktToWkb(x), BytesRef.class),
+ AGGREGATE_METRIC_DOUBLE(
+ x -> x == null ? null : stringToAggregateMetricDoubleLiteral(x),
+ AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.class
+ ),
UNSUPPORTED(Type::convertUnsupported, Void.class);
private static Void convertUnsupported(String s) {
@@ -560,11 +566,18 @@ public static Type asType(ElementType elementType, Type actualType) {
case BYTES_REF -> bytesRefBlockType(actualType);
case BOOLEAN -> BOOLEAN;
case DOC -> throw new IllegalArgumentException("can't assert on doc blocks");
- case COMPOSITE -> throw new IllegalArgumentException("can't assert on composite blocks");
+ case COMPOSITE -> compositeBlockType(actualType);
case UNKNOWN -> throw new IllegalArgumentException("Unknown block types cannot be handled");
};
}
+ private static Type compositeBlockType(Type actualType) {
+ return switch (actualType) {
+ case AGGREGATE_METRIC_DOUBLE -> actualType;
+ default -> throw new IllegalArgumentException("can't assert on composite blocks that aren't aggregate metric doubles");
+ };
+ }
+
private static Type bytesRefBlockType(Type actualType) {
return switch (actualType) {
case NULL -> NULL;
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
index 6d1661a9ed5d8..715fe7c0d0958 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
@@ -21,6 +21,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.BytesRefBlock;
@@ -786,6 +787,12 @@ public static Literal randomLiteral(DataType type) {
case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint());
case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean()));
case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean()));
+ case AGGREGATE_METRIC_DOUBLE -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
+ randomDouble(),
+ randomDouble(),
+ randomDouble(),
+ randomInt()
+ );
case NULL -> null;
case SOURCE -> {
try {
@@ -796,8 +803,9 @@ public static Literal randomLiteral(DataType type) {
throw new UncheckedIOException(e);
}
}
- case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE ->
- throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]");
+ case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException(
+ "can't make random values for [" + type.typeName() + "]"
+ );
}, type);
}
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/convert.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/convert.csv-spec
index 68a1f1691b911..6d126cd00a44b 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/convert.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/convert.csv-spec
@@ -451,3 +451,31 @@ emp_no:integer | birth_date:datetime
10097 | 1952-02-27T00:00:00.000Z
10100 | 1953-04-21T00:00:00.000Z
;
+
+convertToAggregateMetricDouble
+required_capability: aggregate_metric_double_convert_to
+//tag::toAggregateMetricDouble[]
+ROW x = 3892095203
+| EVAL agg_metric = TO_AGGREGATE_METRIC_DOUBLE(x)
+//end::toAggregateMetricDouble[]
+;
+
+//tag::toAggregateMetricDouble-result[]
+x:long | agg_metric:aggregate_metric_double
+3892095203 | {"min":3892095203.0,"max":3892095203.0,"sum":3892095203.0,"value_count":1}
+//end::toAggregateMetricDouble-result[]
+;
+
+convertToAggregateMetricDoubleMv
+required_capability: aggregate_metric_double_convert_to
+//tag::toAggregateMetricDoubleMv[]
+ROW x = [5032, 11111, 40814]
+| EVAL agg_metric = TO_AGGREGATE_METRIC_DOUBLE(x)
+//end::toAggregateMetricDoubleMv[]
+;
+
+//tag::toAggregateMetricDoubleMv-result[]
+x:integer | agg_metric:aggregate_metric_double
+[5032, 11111, 40814] | {"min":5032.0,"max":40814.0,"sum":56957.0,"value_count":3}
+//end::toAggregateMetricDoubleMv-result[]
+;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
index 608532bc04717..405df63126061 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
@@ -772,7 +772,17 @@ public enum Cap {
/**
* Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}.
*/
- FIX_REPLACE_MISSING_FIELD_WITH_NULL_DUPLICATE_NAME_ID_IN_LAYOUT;
+ FIX_REPLACE_MISSING_FIELD_WITH_NULL_DUPLICATE_NAME_ID_IN_LAYOUT,
+
+ /**
+ * Support for to_aggregate_metric_double function
+ */
+ AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
+ /**
+ * Support for sorting when aggregate_metric_doubles are present
+ */
+ AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);
private final boolean enabled;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java
index dba0ec799f312..b93028b3e5897 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/ExpressionWritables.java
@@ -14,6 +14,7 @@
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextWritables;
import org.elasticsearch.xpack.esql.expression.function.scalar.ScalarFunctionWritables;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
+import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
@@ -180,6 +181,7 @@ public static List unaryScalars() {
entries.add(StY.ENTRY);
entries.add(Tan.ENTRY);
entries.add(Tanh.ENTRY);
+ entries.add(ToAggregateMetricDouble.ENTRY);
entries.add(ToBase64.ENTRY);
entries.add(ToBoolean.ENTRY);
entries.add(ToCartesianPoint.ENTRY);
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
index 27a2d9932c312..13377cf13ad2f 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
@@ -42,6 +42,7 @@
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Greatest;
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
+import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
@@ -376,6 +377,7 @@ private static FunctionDefinition[][] functions() {
// conversion functions
new FunctionDefinition[] {
def(FromBase64.class, FromBase64::new, "from_base64"),
+ def(ToAggregateMetricDouble.class, ToAggregateMetricDouble::new, "to_aggregate_metric_double", "to_aggregatemetricdouble"),
def(ToBase64.class, ToBase64::new, "to_base64"),
def(ToBoolean.class, ToBoolean::new, "to_boolean", "to_bool"),
def(ToCartesianPoint.class, ToCartesianPoint::new, "to_cartesianpoint"),
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java
new file mode 100644
index 0000000000000..4f9e9b9d6c046
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java
@@ -0,0 +1,570 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.CompositeBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.data.Vector;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.Example;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
+import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
+import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
+import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
+import static org.elasticsearch.xpack.esql.core.type.DataType.LONG;
+import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
+
+public class ToAggregateMetricDouble extends AbstractConvertFunction {
+
+ private static final Map EVALUATORS = Map.ofEntries(
+ Map.entry(AGGREGATE_METRIC_DOUBLE, (source, fieldEval) -> fieldEval),
+ Map.entry(DOUBLE, DoubleFactory::new),
+ Map.entry(INTEGER, IntFactory::new),
+ Map.entry(LONG, LongFactory::new),
+ Map.entry(UNSIGNED_LONG, UnsignedLongFactory::new)
+ );
+
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ Expression.class,
+ "ToAggregateMetricDouble",
+ ToAggregateMetricDouble::new
+ );
+
+ @FunctionInfo(
+ returnType = "aggregate_metric_double",
+ description = "Encode a numeric to an aggregate_metric_double.",
+ examples = {
+ @Example(file = "convert", tag = "toAggregateMetricDouble"),
+ @Example(description = "The expression also accepts multi-values", file = "convert", tag = "toAggregateMetricDoubleMv") }
+ )
+ public ToAggregateMetricDouble(
+ Source source,
+ @Param(
+ name = "number",
+ type = { "double", "long", "unsigned_long", "integer", "aggregate_metric_double" },
+ description = "Input value. The input can be a single- or multi-valued column or an expression."
+ ) Expression field
+ ) {
+ super(source, field);
+ }
+
+ private ToAggregateMetricDouble(StreamInput in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ protected TypeResolution resolveType() {
+ if (childrenResolved() == false) {
+ return new TypeResolution("Unresolved children");
+ }
+ return isType(
+ field,
+ dt -> dt == DataType.AGGREGATE_METRIC_DOUBLE || dt == DataType.DOUBLE || dt == LONG || dt == INTEGER || dt == UNSIGNED_LONG,
+ sourceText(),
+ DEFAULT,
+ "numeric or aggregate_metric_double"
+ );
+ }
+
+ @Override
+ public DataType dataType() {
+ return AGGREGATE_METRIC_DOUBLE;
+ }
+
+ @Override
+ public Expression replaceChildren(List newChildren) {
+ return new ToAggregateMetricDouble(source(), newChildren.get(0));
+ }
+
+ @Override
+ protected NodeInfo extends Expression> info() {
+ return NodeInfo.create(this, ToAggregateMetricDouble::new, field);
+ }
+
+ @Override
+ protected Map factories() {
+ return EVALUATORS;
+ }
+
+ private static class AggregateMetricDoubleVectorBuilder implements Releasable {
+ private final DoubleVector.FixedBuilder valuesBuilder;
+ private final BlockFactory blockFactory;
+
+ private AggregateMetricDoubleVectorBuilder(int estimatedSize, BlockFactory blockFactory) {
+ this.blockFactory = blockFactory;
+ this.valuesBuilder = blockFactory.newDoubleVectorFixedBuilder(estimatedSize);
+ }
+
+ private void appendValue(double value) {
+ valuesBuilder.appendDouble(value);
+ }
+
+ private Block build() {
+ Block[] blocks = new Block[4];
+ Block block;
+ boolean success = false;
+ try {
+ block = valuesBuilder.build().asBlock();
+ blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = block;
+ blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = block;
+ block.incRef();
+ blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = block;
+ block.incRef();
+ blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = blockFactory.newConstantIntBlockWith(
+ 1,
+ block.getPositionCount()
+ );
+ CompositeBlock compositeBlock = new CompositeBlock(blocks);
+ success = true;
+ return compositeBlock;
+ } finally {
+ if (success == false) {
+ Releasables.closeExpectNoException(blocks);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(valuesBuilder);
+ }
+ }
+
+ public static class DoubleFactory implements EvalOperator.ExpressionEvaluator.Factory {
+ private final Source source;
+
+ private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
+
+ public DoubleFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
+ this.fieldEvaluator = fieldEvaluator;
+ this.source = source;
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromDoubleEvaluator[" + "field=" + fieldEvaluator + "]";
+ }
+
+ @Override
+ public EvalOperator.ExpressionEvaluator get(DriverContext context) {
+ final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
+
+ return new EvalOperator.ExpressionEvaluator() {
+ private Block evalBlock(Block block) {
+ int positionCount = block.getPositionCount();
+ DoubleBlock doubleBlock = (DoubleBlock) block;
+ try (
+ AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
+ .newAggregateMetricDoubleBlockBuilder(positionCount)
+ ) {
+ CompensatedSum compensatedSum = new CompensatedSum();
+ for (int p = 0; p < positionCount; p++) {
+ int valueCount = doubleBlock.getValueCount(p);
+ if (valueCount == 0) {
+ builder.appendNull();
+ continue;
+ }
+ int start = doubleBlock.getFirstValueIndex(p);
+ int end = start + valueCount;
+ if (valueCount == 1) {
+ double current = doubleBlock.getDouble(start);
+ builder.min().appendDouble(current);
+ builder.max().appendDouble(current);
+ builder.sum().appendDouble(current);
+ builder.count().appendInt(valueCount);
+ continue;
+ }
+ double min = Double.POSITIVE_INFINITY;
+ double max = Double.NEGATIVE_INFINITY;
+ for (int i = start; i < end; i++) {
+ double current = doubleBlock.getDouble(i);
+ min = Math.min(min, current);
+ max = Math.max(max, current);
+ compensatedSum.add(current);
+ }
+ builder.min().appendDouble(min);
+ builder.max().appendDouble(max);
+ builder.sum().appendDouble(compensatedSum.value());
+ builder.count().appendInt(valueCount);
+ compensatedSum.reset(0, 0);
+ }
+ return builder.build();
+ }
+ }
+
+ private Block evalVector(Vector vector) {
+ int positionCount = vector.getPositionCount();
+ DoubleVector doubleVector = (DoubleVector) vector;
+ try (
+ AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
+ positionCount,
+ context.blockFactory()
+ )
+ ) {
+ for (int p = 0; p < positionCount; p++) {
+ double value = doubleVector.getDouble(p);
+ builder.appendValue(value);
+ }
+ return builder.build();
+ }
+ }
+
+ @Override
+ public Block eval(Page page) {
+ try (Block block = eval.eval(page)) {
+ Vector vector = block.asVector();
+ return vector == null ? evalBlock(block) : evalVector(vector);
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(eval);
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + eval + "]";
+ }
+ };
+ }
+ }
+
+ public static class IntFactory implements EvalOperator.ExpressionEvaluator.Factory {
+ private final Source source;
+
+ private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
+
+ public IntFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
+ this.fieldEvaluator = fieldEvaluator;
+ this.source = source;
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromIntEvaluator[" + "field=" + fieldEvaluator + "]";
+ }
+
+ @Override
+ public EvalOperator.ExpressionEvaluator get(DriverContext context) {
+ final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
+
+ return new EvalOperator.ExpressionEvaluator() {
+ @Override
+ public Block eval(Page page) {
+ try (Block block = eval.eval(page)) {
+ Vector vector = block.asVector();
+ return vector == null ? evalBlock(block) : evalVector(vector);
+ }
+ }
+
+ private Block evalBlock(Block block) {
+ int positionCount = block.getPositionCount();
+ IntBlock intBlock = (IntBlock) block;
+ try (
+ AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
+ .newAggregateMetricDoubleBlockBuilder(positionCount)
+ ) {
+ CompensatedSum sum = new CompensatedSum();
+ for (int p = 0; p < positionCount; p++) {
+ int valueCount = intBlock.getValueCount(p);
+ int start = intBlock.getFirstValueIndex(p);
+ int end = start + valueCount;
+ if (valueCount == 0) {
+ builder.appendNull();
+ continue;
+ }
+ if (valueCount == 1) {
+ double current = intBlock.getInt(start);
+ builder.min().appendDouble(current);
+ builder.max().appendDouble(current);
+ builder.sum().appendDouble(current);
+ builder.count().appendInt(valueCount);
+ continue;
+ }
+ double min = Double.POSITIVE_INFINITY;
+ double max = Double.NEGATIVE_INFINITY;
+ for (int i = start; i < end; i++) {
+ double current = intBlock.getInt(i);
+ min = Math.min(min, current);
+ max = Math.max(max, current);
+ sum.add(current);
+ }
+ builder.min().appendDouble(min);
+ builder.max().appendDouble(max);
+ builder.sum().appendDouble(sum.value());
+ builder.count().appendInt(valueCount);
+ sum.reset(0, 0);
+ }
+ return builder.build();
+ }
+ }
+
+ private Block evalVector(Vector vector) {
+ int positionCount = vector.getPositionCount();
+ IntVector intVector = (IntVector) vector;
+ try (
+ AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
+ positionCount,
+ context.blockFactory()
+ )
+ ) {
+ for (int p = 0; p < positionCount; p++) {
+ double value = intVector.getInt(p);
+ builder.appendValue(value);
+ }
+ return builder.build();
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(eval);
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromIntEvaluator[field=" + eval + "]";
+ }
+ };
+ }
+ }
+
+ public static class LongFactory implements EvalOperator.ExpressionEvaluator.Factory {
+ private final Source source;
+
+ private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
+
+ public LongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
+ this.fieldEvaluator = fieldEvaluator;
+ this.source = source;
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromLongEvaluator[" + "field=" + fieldEvaluator + "]";
+ }
+
+ @Override
+ public EvalOperator.ExpressionEvaluator get(DriverContext context) {
+ final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
+
+ return new EvalOperator.ExpressionEvaluator() {
+ private Block evalBlock(Block block) {
+ int positionCount = block.getPositionCount();
+ LongBlock longBlock = (LongBlock) block;
+ try (
+ AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
+ .newAggregateMetricDoubleBlockBuilder(positionCount)
+ ) {
+ CompensatedSum sum = new CompensatedSum();
+ for (int p = 0; p < positionCount; p++) {
+ int valueCount = longBlock.getValueCount(p);
+ int start = longBlock.getFirstValueIndex(p);
+ int end = start + valueCount;
+ if (valueCount == 0) {
+ builder.appendNull();
+ continue;
+ }
+ if (valueCount == 1) {
+ double current = longBlock.getLong(start);
+ builder.min().appendDouble(current);
+ builder.max().appendDouble(current);
+ builder.sum().appendDouble(current);
+ builder.count().appendInt(valueCount);
+ continue;
+ }
+ double min = Double.POSITIVE_INFINITY;
+ double max = Double.NEGATIVE_INFINITY;
+ for (int i = start; i < end; i++) {
+ double current = longBlock.getLong(i);
+ min = Math.min(min, current);
+ max = Math.max(max, current);
+ sum.add(current);
+ }
+ builder.min().appendDouble(min);
+ builder.max().appendDouble(max);
+ builder.sum().appendDouble(sum.value());
+ builder.count().appendInt(valueCount);
+ sum.reset(0, 0);
+ }
+ return builder.build();
+ }
+ }
+
+ private Block evalVector(Vector vector) {
+ int positionCount = vector.getPositionCount();
+ LongVector longVector = (LongVector) vector;
+ try (
+ AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
+ positionCount,
+ context.blockFactory()
+ )
+ ) {
+ for (int p = 0; p < positionCount; p++) {
+ double value = longVector.getLong(p);
+ builder.appendValue(value);
+ }
+ return builder.build();
+ }
+ }
+
+ @Override
+ public Block eval(Page page) {
+ try (Block block = eval.eval(page)) {
+ Vector vector = block.asVector();
+ return vector == null ? evalBlock(block) : evalVector(vector);
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(eval);
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromLongEvaluator[field=" + eval + "]";
+ }
+ };
+ }
+ }
+
+ public static class UnsignedLongFactory implements EvalOperator.ExpressionEvaluator.Factory {
+ private final Source source;
+
+ private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
+
+ public UnsignedLongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
+ this.fieldEvaluator = fieldEvaluator;
+ this.source = source;
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[" + "field=" + fieldEvaluator + "]";
+ }
+
+ @Override
+ public EvalOperator.ExpressionEvaluator get(DriverContext context) {
+ final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
+
+ return new EvalOperator.ExpressionEvaluator() {
+ private Block evalBlock(Block block) {
+ int positionCount = block.getPositionCount();
+ LongBlock longBlock = (LongBlock) block;
+ try (
+ AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
+ .newAggregateMetricDoubleBlockBuilder(positionCount)
+ ) {
+ CompensatedSum sum = new CompensatedSum();
+ for (int p = 0; p < positionCount; p++) {
+ int valueCount = longBlock.getValueCount(p);
+ int start = longBlock.getFirstValueIndex(p);
+ int end = start + valueCount;
+ if (valueCount == 0) {
+ builder.appendNull();
+ continue;
+ }
+ if (valueCount == 1) {
+ double current = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p));
+ builder.min().appendDouble(current);
+ builder.max().appendDouble(current);
+ builder.sum().appendDouble(current);
+ builder.count().appendInt(valueCount);
+ continue;
+ }
+ double min = Double.POSITIVE_INFINITY;
+ double max = Double.NEGATIVE_INFINITY;
+ for (int i = start; i < end; i++) {
+ double current = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p));
+ min = Math.min(min, current);
+ max = Math.max(max, current);
+ sum.add(current);
+ }
+ builder.min().appendDouble(min);
+ builder.max().appendDouble(max);
+ builder.sum().appendDouble(sum.value());
+ builder.count().appendInt(valueCount);
+ sum.reset(0, 0);
+ }
+ return builder.build();
+ }
+ }
+
+ private Block evalVector(Vector vector) {
+ int positionCount = vector.getPositionCount();
+ LongVector longVector = (LongVector) vector;
+ try (
+ AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
+ positionCount,
+ context.blockFactory()
+ )
+ ) {
+ for (int p = 0; p < positionCount; p++) {
+ double value = EsqlDataTypeConverter.unsignedLongToDouble(longVector.getLong(p));
+ builder.appendValue(value);
+ }
+ return builder.build();
+ }
+ }
+
+ @Override
+ public Block eval(Page page) {
+ try (Block block = eval.eval(page)) {
+ Vector vector = block.asVector();
+ return vector == null ? evalBlock(block) : evalVector(vector);
+ }
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(eval);
+ }
+
+ @Override
+ public String toString() {
+ return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + eval + "]";
+ }
+ };
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
index b560ca0624ce2..7141c618a9529 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
@@ -372,10 +372,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
case VERSION -> TopNEncoder.VERSION;
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
OBJECT, SCALED_FLOAT, UNSIGNED_LONG, DOC_DATA_TYPE, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE;
- case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE ->
- TopNEncoder.DEFAULT_UNSORTABLE;
+ case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
+ AGGREGATE_METRIC_DOUBLE -> TopNEncoder.DEFAULT_UNSORTABLE;
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
- case PARTIAL_AGG, UNSUPPORTED, AGGREGATE_METRIC_DOUBLE -> TopNEncoder.UNSUPPORTED;
+ case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
};
}
List orders = topNExec.order().stream().map(order -> {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java
index 5540addd59bf5..a3f2f327c17ac 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java
@@ -16,6 +16,7 @@
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.time.DateUtils;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.Metric;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
@@ -708,6 +709,60 @@ public static String aggregateMetricDoubleBlockToString(CompositeBlock composite
}
}
+ public static String aggregateMetricDoubleLiteralToString(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral aggMetric) {
+ try (XContentBuilder builder = JsonXContent.contentBuilder()) {
+ builder.startObject();
+ if (aggMetric.min() != null) {
+ builder.field(Metric.MIN.getLabel(), aggMetric.min());
+ }
+ if (aggMetric.max() != null) {
+ builder.field(Metric.MAX.getLabel(), aggMetric.max());
+ }
+ if (aggMetric.sum() != null) {
+ builder.field(Metric.SUM.getLabel(), aggMetric.sum());
+ }
+ if (aggMetric.count() != null) {
+ builder.field(Metric.COUNT.getLabel(), aggMetric.count());
+ }
+ builder.endObject();
+ return Strings.toString(builder);
+ } catch (IOException e) {
+ throw new IllegalStateException("error rendering aggregate metric double", e);
+ }
+ }
+
+ public static AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral stringToAggregateMetricDoubleLiteral(String s) {
+ Double min = null;
+ Double max = null;
+ Double sum = null;
+ Integer count = null;
+ String[] values = s.substring(1, s.length() - 1).split(",");
+ for (String v : values) {
+ var pair = v.split(":");
+ String type = pair[0];
+ String number = pair[1];
+ switch (type) {
+ case "min":
+ min = Double.parseDouble(number);
+ break;
+ case "max":
+ max = Double.parseDouble(number);
+ break;
+ case "sum":
+ sum = Double.parseDouble(number);
+ break;
+ case "value_count":
+ count = Integer.parseInt(number);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Received a metric that wasn't min, max, sum, or value_count: " + type + " with value: " + number
+ );
+ }
+ }
+ return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(min, max, sum, count);
+ }
+
public enum EsqlConverter implements Converter {
STRING_TO_DATE_PERIOD(x -> EsqlDataTypeConverter.parseTemporalAmount(x, DataType.DATE_PERIOD)),
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
index 3e644f3e61b05..8e396e4753f09 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
@@ -10,9 +10,11 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.GenericNamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
@@ -112,6 +114,13 @@ public static NamedWriteableRegistry writableRegistry() {
entries.add(SingleValueQuery.ENTRY);
entries.addAll(ExpressionWritables.getNamedWriteables());
entries.addAll(PlanWritables.getNamedWriteables());
+ entries.add(
+ new NamedWriteableRegistry.Entry(
+ GenericNamedWriteable.class,
+ AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY.name,
+ AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral::new
+ )
+ );
return new NamedWriteableRegistry(entries);
}
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java
new file mode 100644
index 0000000000000..14910572d8c9d
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDoubleTests.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.AbstractScalarFunctionTestCase;
+import org.elasticsearch.xpack.esql.expression.function.FunctionName;
+import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static java.util.Collections.emptyList;
+
+@FunctionName("to_aggregate_metric_double")
+public class ToAggregateMetricDoubleTests extends AbstractScalarFunctionTestCase {
+ public ToAggregateMetricDoubleTests(@Name("TestCase") Supplier testCaseSupplier) {
+ this.testCase = testCaseSupplier.get();
+ }
+
+ @Override
+ protected Expression build(Source source, List args) {
+ if (args.get(0).dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
+ assumeTrue("Test sometimes wraps literals as fields", args.get(0).foldable());
+ }
+ return new ToAggregateMetricDouble(source, args.get(0));
+ }
+
+ @ParametersFactory
+ public static Iterable