Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122660.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122660
summary: Render `aggregate_metric_double`
area: "ES|QL"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,26 @@ public BlockLoader.IntBuilder count() {
}

public enum Metric {
MIN(0),
MAX(1),
SUM(2),
COUNT(3);
MIN(0, "min"),
MAX(1, "max"),
SUM(2, "sum"),
COUNT(3, "value_count");

private final int index;
private final String label;

Metric(int index) {
Metric(int index, String label) {
this.index = index;
this.label = label;
}

public int getIndex() {
return index;
}

public String getLabel() {
return label;
}
}

public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ public int getFirstValueIndex(int position) {

@Override
public int getValueCount(int position) {
return blocks[0].getValueCount(position);
int max = 0;
for (var block : blocks) {
max = Math.max(max, block.getValueCount(position));
}
return max;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this to just return the max of all the blocks, not sure if we wanted to go with some other logic? (Like end early if a block ever returns getValueCount of 1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid this - see my comment in the evaluator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do still need some way of computing this for CompositeBlock though, I ran into it here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It's fine to use the max count here, but I think it's less error-prone not to implement this method. Alternatively, can we check the data type and convert the value at that position within the composite block in valueAtPosition? I'm also fine leaving it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to adjust this later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an assertion or check that this logic is only valid for aggregate_metric_double fields? We can add this in a follow up given that this functionality is only available in snapshot builds. Can you add this to the list of tasks?

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,12 @@ public enum Cap {
/**
* Support partial_results
*/
SUPPORT_PARTIAL_RESULTS;
SUPPORT_PARTIAL_RESULTS,

/**
* Support for rendering aggregate_metric_double type
*/
AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
Expand All @@ -25,6 +26,7 @@
import java.io.IOException;

import static org.elasticsearch.xpack.esql.core.util.NumericUtils.unsignedLongAsNumber;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.aggregateMetricDoubleBlockToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.ipToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.nanoTimeToString;
Expand Down Expand Up @@ -148,8 +150,14 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa
return builder.value(versionToString(val));
}
};
// TODO: Add implementation for aggregate_metric_double
case NULL, AGGREGATE_METRIC_DOUBLE -> new PositionToXContent(block) {
case AGGREGATE_METRIC_DOUBLE -> new PositionToXContent(block) {
@Override
protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
throws IOException {
return builder.value(aggregateMetricDoubleBlockToString((CompositeBlock) block, valueIndex));
}
};
case NULL -> new PositionToXContent(block) {
@Override
protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
Expand All @@ -30,6 +31,7 @@
import java.util.List;

import static org.elasticsearch.xpack.esql.core.util.NumericUtils.unsignedLongAsNumber;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.aggregateMetricDoubleBlockToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.ipToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.nanoTimeToString;
Expand Down Expand Up @@ -132,7 +134,8 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR
case GEO_POINT, GEO_SHAPE, CARTESIAN_POINT, CARTESIAN_SHAPE -> spatialToString(
((BytesRefBlock) block).getBytesRef(offset, scratch)
);
case UNSUPPORTED, AGGREGATE_METRIC_DOUBLE -> (String) null;
case AGGREGATE_METRIC_DOUBLE -> aggregateMetricDoubleBlockToString((CompositeBlock) block, offset);
case UNSUPPORTED -> (String) null;
case SOURCE -> {
BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_SHAPE;
Expand Down Expand Up @@ -67,7 +68,8 @@ public class ToString extends AbstractConvertFunction implements EvaluatorMapper
Map.entry(GEO_POINT, ToStringFromGeoPointEvaluator.Factory::new),
Map.entry(CARTESIAN_POINT, ToStringFromCartesianPointEvaluator.Factory::new),
Map.entry(CARTESIAN_SHAPE, ToStringFromCartesianShapeEvaluator.Factory::new),
Map.entry(GEO_SHAPE, ToStringFromGeoShapeEvaluator.Factory::new)
Map.entry(GEO_SHAPE, ToStringFromGeoShapeEvaluator.Factory::new),
Map.entry(AGGREGATE_METRIC_DOUBLE, ToStringFromAggregateMetricDoubleEvaluator.Factory::new)
);

@FunctionInfo(
Expand All @@ -82,6 +84,7 @@ public ToString(
@Param(
name = "field",
type = {
"aggregate_metric_double",
"boolean",
"cartesian_point",
"cartesian_shape",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.scalar.convert;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.Vector;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.xpack.esql.core.tree.Source;

import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.aggregateMetricDoubleBlockToString;

public class ToStringFromAggregateMetricDoubleEvaluator extends AbstractConvertFunction.AbstractEvaluator {
public ToStringFromAggregateMetricDoubleEvaluator(EvalOperator.ExpressionEvaluator field, Source source, DriverContext driverContext) {
super(driverContext, field, source);
}

@Override
protected String name() {
return "ToStringFromAggregateMetricDouble";
}

@Override
protected Block evalVector(Vector v) {
return evalBlock(v.asBlock());
}

private static BytesRef evalValue(CompositeBlock compositeBlock, int index) {
return new BytesRef(aggregateMetricDoubleBlockToString(compositeBlock, index));
}

@Override
public Block evalBlock(Block b) {
CompositeBlock block = (CompositeBlock) b;
int positionCount = block.getPositionCount();
try (BytesRefBlock.Builder builder = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
for (int p = 0; p < positionCount; p++) {
if (block.isNull(p)) {
builder.appendNull();
} else {
builder.appendBytesRef(evalValue(block, p));
}
}
return builder.build();
}
}

public static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
private final Source source;
private final EvalOperator.ExpressionEvaluator.Factory field;

public Factory(EvalOperator.ExpressionEvaluator.Factory field, Source source) {
this.field = field;
this.source = source;
}

@Override
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
return new ToStringFromAggregateMetricDoubleEvaluator(field.get(context), source, context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
package org.elasticsearch.xpack.esql.type;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.Metric;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
Expand Down Expand Up @@ -664,6 +671,26 @@ public static long booleanToUnsignedLong(boolean number) {
return number ? ONE_AS_UNSIGNED_LONG : ZERO_AS_UNSIGNED_LONG;
}

public static String aggregateMetricDoubleBlockToString(CompositeBlock compositeBlock, int index) {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
for (Metric metric : List.of(Metric.MIN, Metric.MAX, Metric.SUM)) {
var block = compositeBlock.getBlock(metric.getIndex());
if (block.isNull(index) == false) {
builder.field(metric.getLabel(), ((DoubleBlock) block).getDouble(index));
}
}
var countBlock = compositeBlock.getBlock(Metric.COUNT.getIndex());
if (countBlock.isNull(index) == false) {
builder.field(Metric.COUNT.getLabel(), ((IntBlock) countBlock).getInt(index));
}
builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
throw new IllegalStateException("error rendering aggregate metric double", e);
}
}

public enum EsqlConverter implements Converter {

STRING_TO_DATE_PERIOD(x -> EsqlDataTypeConverter.parseTemporalAmount(x, DataType.DATE_PERIOD)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ public void testRegexOnInt() {

public void testUnsupportedTypesWithToString() {
// DATE_PERIOD and TIME_DURATION types have been added, but not really patched through the engine; i.e. supported.
final String supportedTypes = "boolean or cartesian_point or cartesian_shape or date_nanos or datetime "
final String supportedTypes = "aggregate_metric_double or boolean or cartesian_point or cartesian_shape or date_nanos or datetime "
+ "or geo_point or geo_shape or ip or numeric or string or version";
verifyUnsupported(
"row period = 1 year | eval to_string(period)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ setup:
- '{"@timestamp": "2021-04-28T19:50:24.467Z", "agg_metric": {"max": 10, "min": 3}, "k8s": {"pod": {"uid":"947e4ced-1786-4e53-9e0c-5c447e959507"}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:50:44.467Z", "agg_metric": {"max": 17, "min": 2}, "k8s": {"pod": {"uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}}}'
- '{"index": {}}'
- '{"@timestamp": "2021-04-28T19:51:04.467Z", "k8s": {"pod": {"uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9"}}}'

- do:
indices.create:
Expand Down Expand Up @@ -445,6 +447,77 @@ stats on aggregate_metric_double missing min and max:
- match: {values.0.2: 1.0}
- match: {values.0.3: 10}

---
render aggregate_metric_double when missing min and max:
- requires:
test_runner_features: [ capabilities ]
capabilities:
- method: POST
path: /_query
parameters: [ ]
capabilities: [ aggregate_metric_double_rendering ]
reason: "Support for rendering aggregate_metric_doubles"
- do:
allowed_warnings_regex:
- "No limit defined, adding default limit of \\[.*\\]"
esql.query:
body:
query: 'FROM test4 | KEEP agg_metric'

- length: {values: 1}
- length: {values.0: 1}
- match: {columns.0.name: "agg_metric"}
- match: {columns.0.type: "aggregate_metric_double"}
- match: {values.0.0: '{"sum":1.0,"value_count":10}'}


---
render aggregate_metric_double when missing value:
- requires:
test_runner_features: [ capabilities ]
capabilities:
- method: POST
path: /_query
parameters: [ ]
capabilities: [ aggregate_metric_double_rendering ]
reason: "Support for rendering aggregate_metric_doubles"
- do:
allowed_warnings_regex:
- "No limit defined, adding default limit of \\[.*\\]"
esql.query:
body:
query: 'FROM test3 | WHERE @timestamp == "2021-04-28T19:51:04.467Z" | KEEP agg_metric'

- length: {values: 1}
- length: {values.0: 1}
- match: {columns.0.name: "agg_metric"}
- match: {columns.0.type: "aggregate_metric_double"}
- match: {values.0.0: null}


---
to_string aggregate_metric_double:
- requires:
test_runner_features: [ capabilities ]
capabilities:
- method: POST
path: /_query
parameters: [ ]
capabilities: [ aggregate_metric_double_rendering ]
reason: "Support for rendering aggregate_metric_doubles"
- do:
allowed_warnings_regex:
- "No limit defined, adding default limit of \\[.*\\]"
esql.query:
body:
query: 'FROM test4 | EVAL agg = to_string(agg_metric) | KEEP agg'

- length: {values: 1}
- length: {values.0: 1}
- match: {columns.0.name: "agg"}
- match: {columns.0.type: "keyword"}
- match: {values.0.0: '{"sum":1.0,"value_count":10}'}

---
from index pattern unsupported counter:
- requires:
Expand Down Expand Up @@ -480,7 +553,7 @@ from index pattern unsupported counter:
- match: {columns.7.type: "keyword"}
- match: {columns.8.name: "metricset"}
- match: {columns.8.type: "keyword"}
- length: {values: 15}
- length: {values: 16}

---
from index pattern explicit counter use:
Expand All @@ -501,7 +574,7 @@ from index pattern explicit counter use:
query: 'FROM test* | keep *.tx'
- match: {columns.0.name: "k8s.pod.network.tx"}
- match: {columns.0.type: "unsupported"}
- length: {values: 15}
- length: {values: 16}

---
_source:
Expand Down
Loading