Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124595.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124595
summary: '`ToAggregateMetricDouble` function'
area: "ES|QL"
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/125191.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125191
summary: Fix sorting when `aggregate_metric_double` present
area: ES|QL
type: enhancement
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@ public static boolean isRepresentable(DataType t) {
&& t != SOURCE
&& t != HALF_FLOAT
&& t != PARTIAL_AGG
&& t != AGGREGATE_METRIC_DOUBLE
&& t.isCounter() == false;
}

Expand All @@ -578,7 +577,7 @@ public static boolean isSpatial(DataType t) {
}

public static boolean isSortable(DataType t) {
return false == (t == SOURCE || isCounter(t) || isSpatial(t));
return false == (t == SOURCE || isCounter(t) || isSpatial(t) || t == AGGREGATE_METRIC_DOUBLE);
}

public String nameUpper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.GenericNamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;

import java.io.IOException;

public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {

private DoubleBlockBuilder minBuilder;
Expand Down Expand Up @@ -161,11 +169,40 @@ public String getLabel() {
}
}

public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable {
public AggregateMetricDoubleLiteral {
min = min.isNaN() ? null : min;
max = max.isNaN() ? null : max;
sum = sum.isNaN() ? null : sum;
}

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
GenericNamedWriteable.class,
"AggregateMetricDoubleLiteral",
AggregateMetricDoubleLiteral::new
);

@Override
public String getWriteableName() {
return "AggregateMetricDoubleLiteral";
}

public AggregateMetricDoubleLiteral(StreamInput input) throws IOException {
this(input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalInt());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalDouble(min);
out.writeOptionalDouble(max);
out.writeOptionalDouble(sum);
out.writeOptionalInt(count);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,19 @@ private static Object valueAtOffset(Block block, int offset) {
DocVector v = ((DocBlock) block).asVector();
yield new Doc(v.shards().getInt(offset), v.segments().getInt(offset), v.docs().getInt(offset));
}
case COMPOSITE -> throw new IllegalArgumentException("can't read values from composite blocks");
case COMPOSITE -> {
CompositeBlock compositeBlock = (CompositeBlock) block;
var minBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
var maxBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
var sumBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
var countBlock = (IntBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
yield new AggregateMetricDoubleLiteral(
minBlock.getDouble(offset),
maxBlock.getDouble(offset),
sumBlock.getDouble(offset),
countBlock.getInt(offset)
);
}
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static ResultBuilder resultBuilderFor(
case DOUBLE -> new ResultBuilderForDouble(blockFactory, encoder, inKey, positions);
case NULL -> new ResultBuilderForNull(blockFactory);
case DOC -> new ResultBuilderForDoc(blockFactory, positions);
case COMPOSITE -> new ResultBuilderForComposite(blockFactory, positions);
default -> {
assert false : "Result builder for [" + elementType + "]";
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.index.mapper.BlockLoader;

import java.util.List;

public class ResultBuilderForComposite implements ResultBuilder {

private final AggregateMetricDoubleBlockBuilder builder;

ResultBuilderForComposite(BlockFactory blockFactory, int positions) {
this.builder = blockFactory.newAggregateMetricDoubleBlockBuilder(positions);
}

@Override
public void decodeKey(BytesRef keys) {
throw new AssertionError("Composite Block can't be a key");
}

@Override
public void decodeValue(BytesRef values) {
for (BlockLoader.DoubleBuilder subBuilder : List.of(builder.min(), builder.max(), builder.sum())) {
if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) {
subBuilder.appendDouble(TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(values));
} else {
subBuilder.appendNull();
}
}
if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) {
builder.count().appendInt(TopNEncoder.DEFAULT_UNSORTABLE.decodeInt(values));
} else {
builder.count().appendNull();
}
}

@Override
public Block build() {
return builder.build();
}

@Override
public String toString() {
return "ValueExtractorForComposite";
}

@Override
public void close() {
builder.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
Expand Down Expand Up @@ -40,6 +41,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
case DOUBLE -> ValueExtractorForDouble.extractorFor(encoder, inKey, (DoubleBlock) block);
case NULL -> new ValueExtractorForNull();
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
case COMPOSITE -> new ValueExtractorForComposite(encoder, (CompositeBlock) block);
default -> {
assert false : "No value extractor for [" + block.elementType() + "]";
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.CompositeBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

import java.util.List;

public class ValueExtractorForComposite implements ValueExtractor {
private final CompositeBlock block;

ValueExtractorForComposite(TopNEncoder encoder, CompositeBlock block) {
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
this.block = block;
}

@Override
public void writeValue(BreakingBytesRefBuilder values, int position) {
if (block.getBlockCount() != AggregateMetricDoubleBlockBuilder.Metric.values().length) {
throw new UnsupportedOperationException("Composite Blocks for non-aggregate-metric-doubles do not have value extractors");
}
for (AggregateMetricDoubleBlockBuilder.Metric metric : List.of(
AggregateMetricDoubleBlockBuilder.Metric.MIN,
AggregateMetricDoubleBlockBuilder.Metric.MAX,
AggregateMetricDoubleBlockBuilder.Metric.SUM
)) {
DoubleBlock doubleBlock = block.getBlock(metric.getIndex());
if (doubleBlock.isNull(position)) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values);
} else {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(true, values);
TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(doubleBlock.getDouble(position), values);
}
}
IntBlock intBlock = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
if (intBlock.isNull(position)) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values);
} else {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(true, values);
TopNEncoder.DEFAULT_UNSORTABLE.encodeInt(intBlock.getInt(position), values);
}
}

@Override
public String toString() {
return "ValueExtractorForComposite";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.DocValueFormat;
Expand Down Expand Up @@ -40,6 +41,7 @@
import static org.elasticsearch.xpack.esql.core.util.NumericUtils.unsignedLongAsNumber;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.CARTESIAN;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.aggregateMetricDoubleLiteralToString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -405,6 +407,11 @@ private static Object convertExpectedValue(Type expectedType, Object expectedVal
case VERSION -> // convert BytesRef-packed Version to String
rebuildExpected(expectedValue, BytesRef.class, x -> new Version((BytesRef) x).toString());
case UNSIGNED_LONG -> rebuildExpected(expectedValue, Long.class, x -> unsignedLongAsNumber((long) x));
case AGGREGATE_METRIC_DOUBLE -> rebuildExpected(
expectedValue,
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.class,
x -> aggregateMetricDoubleLiteralToString((AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral) x)
);
default -> expectedValue;
};
}
Expand Down
Loading