Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ public static boolean isSpatial(DataType t) {
}

public static boolean isSortable(DataType t) {
return false == (t == SOURCE || isCounter(t) || isSpatial(t));
return false == (t == SOURCE || isCounter(t) || isSpatial(t) || t == AGGREGATE_METRIC_DOUBLE);
}

public String nameUpper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static ResultBuilder resultBuilderFor(
case DOUBLE -> new ResultBuilderForDouble(blockFactory, encoder, inKey, positions);
case NULL -> new ResultBuilderForNull(blockFactory);
case DOC -> new ResultBuilderForDoc(blockFactory, positions);
case COMPOSITE -> new ResultBuilderForComposite(blockFactory, positions);
default -> {
assert false : "Result builder for [" + elementType + "]";
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.index.mapper.BlockLoader;

import java.util.List;

public class ResultBuilderForComposite implements ResultBuilder {

private final AggregateMetricDoubleBlockBuilder builder;

ResultBuilderForComposite(BlockFactory blockFactory, int positions) {
this.builder = blockFactory.newAggregateMetricDoubleBlockBuilder(positions);
}

@Override
public void decodeKey(BytesRef keys) {
throw new AssertionError("Composite Block can't be a key");
}

@Override
public void decodeValue(BytesRef values) {
for (BlockLoader.DoubleBuilder subBuilder : List.of(builder.min(), builder.max(), builder.sum())) {
if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) {
subBuilder.appendDouble(TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(values));
} else {
subBuilder.appendNull();
}
}
if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) {
builder.count().appendInt(TopNEncoder.DEFAULT_UNSORTABLE.decodeInt(values));
} else {
builder.count().appendNull();
}
}

@Override
public Block build() {
return builder.build();
}

@Override
public String toString() {
return "ValueExtractorForComposite";
}

@Override
public void close() {
builder.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
Expand Down Expand Up @@ -37,6 +38,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
case DOUBLE -> ValueExtractorForDouble.extractorFor(encoder, inKey, (DoubleBlock) block);
case NULL -> new ValueExtractorForNull();
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
case COMPOSITE -> new ValueExtractorForComposite(encoder, (CompositeBlock) block);
default -> {
assert false : "No value extractor for [" + block.elementType() + "]";
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

import java.util.List;

public class ValueExtractorForComposite implements ValueExtractor {
private final CompositeBlock block;

ValueExtractorForComposite(TopNEncoder encoder, CompositeBlock block) {
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
this.block = block;
}

@Override
public void writeValue(BreakingBytesRefBuilder values, int position) {
if (block.getBlockCount() != AggregateMetricDoubleBlockBuilder.Metric.values().length) {
throw new UnsupportedOperationException("Composite Blocks for non-aggregate-metric-doubles do not have value extractors");
}
for (AggregateMetricDoubleBlockBuilder.Metric metric : List.of(
AggregateMetricDoubleBlockBuilder.Metric.MIN,
AggregateMetricDoubleBlockBuilder.Metric.MAX,
AggregateMetricDoubleBlockBuilder.Metric.SUM
)) {
DoubleBlock doubleBlock = block.getBlock(metric.getIndex());
if (doubleBlock.isNull(position)) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values);
} else {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(true, values);
TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(doubleBlock.getDouble(position), values);
}
}
IntBlock intBlock = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
if (intBlock.isNull(position)) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values);
} else {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(true, values);
TopNEncoder.DEFAULT_UNSORTABLE.encodeInt(intBlock.getInt(position), values);
}
}

@Override
public String toString() {
return "ValueExtractorForComposite";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,12 @@ public enum Cap {
/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),

/**
* Support for sorting when aggregate_metric_doubles are present
*/
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
case VERSION -> TopNEncoder.VERSION;
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
OBJECT, SCALED_FLOAT, UNSIGNED_LONG, DOC_DATA_TYPE, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE;
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE ->
TopNEncoder.DEFAULT_UNSORTABLE;
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
AGGREGATE_METRIC_DOUBLE -> TopNEncoder.DEFAULT_UNSORTABLE;
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
case PARTIAL_AGG, UNSUPPORTED, AGGREGATE_METRIC_DOUBLE -> TopNEncoder.UNSUPPORTED;
case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
};
}
List<TopNOperator.SortOrder> orders = topNExec.order().stream().map(order -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,22 +627,22 @@ public void read(int docId, StoredFields storedFields, Builder builder) throws I
}

private void readSingleRow(int docId, AggregateMetricDoubleBuilder builder) throws IOException {
if (minValues.advanceExact(docId)) {
if (minValues != null && minValues.advanceExact(docId)) {
builder.min().appendDouble(NumericUtils.sortableLongToDouble(minValues.longValue()));
} else {
builder.min().appendNull();
}
if (maxValues.advanceExact(docId)) {
if (maxValues != null && maxValues.advanceExact(docId)) {
builder.max().appendDouble(NumericUtils.sortableLongToDouble(maxValues.longValue()));
} else {
builder.max().appendNull();
}
if (sumValues.advanceExact(docId)) {
if (sumValues != null && sumValues.advanceExact(docId)) {
builder.sum().appendDouble(NumericUtils.sortableLongToDouble(sumValues.longValue()));
} else {
builder.sum().appendNull();
}
if (valueCountValues.advanceExact(docId)) {
if (valueCountValues != null && valueCountValues.advanceExact(docId)) {
builder.count().appendInt(Math.toIntExact(valueCountValues.longValue()));
} else {
builder.count().appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,54 @@ grouping stats on aggregate_metric_double:
- match: {values.1.3: 16.0}
- match: {values.1.4: "B"}

---
sorting with aggregate_metric_double with partial submetrics:
- requires:
test_runner_features: [capabilities]
capabilities:
- method: POST
path: /_query
parameters: []
capabilities: [aggregate_metric_double_sorting]
reason: "Support for sorting when aggregate_metric_double present"
- do:
allowed_warnings_regex:
- "No limit defined, adding default limit of \\[.*\\]"
esql.query:
body:
query: 'FROM test3 | SORT @timestamp | KEEP @timestamp, agg_metric'

- length: {values: 4}
- length: {values.0: 2}
- match: {columns.0.name: "@timestamp"}
- match: {columns.0.type: "date"}
- match: {columns.1.name: "agg_metric"}
- match: {columns.1.type: "aggregate_metric_double"}
- match: {values.0.0: "2021-04-28T19:50:04.467Z"}
- match: {values.1.0: "2021-04-28T19:50:24.467Z"}
- match: {values.2.0: "2021-04-28T19:50:44.467Z"}
- match: {values.3.0: "2021-04-28T19:51:04.467Z"}
- match: {values.0.1: '{"min":-3.0,"max":1.0}'}
- match: {values.1.1: '{"min":3.0,"max":10.0}'}
- match: {values.2.1: '{"min":2.0,"max":17.0}'}
- match: {values.3.1: null}

---
aggregate_metric_double unsortable:
- requires:
test_runner_features: [capabilities]
capabilities:
- method: POST
path: /_query
parameters: []
capabilities: [aggregate_metric_double_sorting]
reason: "Support for sorting when aggregate_metric_double present"
- do:
catch: /cannot sort on aggregate_metric_double/
esql.query:
body:
query: 'FROM test2 | sort agg_metric'

---
stats on aggregate_metric_double with partial submetrics:
- requires:
Expand Down
Loading