Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ae16694
[ES|QL] Support some stats on aggregate_double_metric
limotova Jan 16, 2025
a5be73b
addressed most comments minus tests
limotova Jan 22, 2025
80b317a
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 23, 2025
a2a6092
FromAggregateDoubleMetricTests
limotova Jan 25, 2025
9094f62
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 25, 2025
b3b27aa
bring back unsupported exceptions
limotova Jan 27, 2025
5acb5a8
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 28, 2025
71a05b2
change valueCount to Integer and revert other getValueCount()
limotova Jan 28, 2025
648e0af
and revert getFirstValueIndex()
limotova Jan 28, 2025
83612ec
test all metrics in aggregate double metric test
limotova Jan 28, 2025
38659b9
refactor things touched in this PR to all be aggregate metric double …
limotova Jan 28, 2025
bfe84c1
[CI] Auto commit changes from spotless
Jan 28, 2025
022eebe
renaming stragglers
limotova Jan 28, 2025
769e924
Update docs/changelog/120343.yaml
limotova Jan 28, 2025
6dd6e36
fix changelog
limotova Jan 28, 2025
5dd5ad9
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 28, 2025
af0e242
Add downsampled yaml test
limotova Jan 28, 2025
1838a82
address some comments
limotova Jan 28, 2025
335fe3b
change Double etc to double
limotova Jan 29, 2025
5608bce
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 29, 2025
c417b3c
add instanceof check
limotova Jan 29, 2025
b5837f3
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 29, 2025
fba979f
Merge branch 'main' into add-aggregate-double-metric-aggregates
limotova Jan 29, 2025
827eaf3
change AggMetDoubLit to record
limotova Jan 29, 2025
fbdb24d
move function to make checkstyle work
limotova Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ interface BlockFactory {
SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count);

// TODO support non-singleton ords

AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count);
}

/**
Expand Down Expand Up @@ -501,4 +503,10 @@ interface SingletonOrdinalsBuilder extends Builder {
*/
SingletonOrdinalsBuilder appendOrd(int value);
}

interface AggregateDoubleMetricBuilder extends Builder {

AggregateDoubleMetricBuilder append(double min, double max, double sum, int valueCount);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public SingletonOrdsBuilder appendOrd(int value) {
}
return new SingletonOrdsBuilder();
}

@Override
public BlockLoader.AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count) {
return null;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin {

public static final FeatureFlag SEMANTIC_TEXT_FEATURE_FLAG = new FeatureFlag("esql_semantic_text");
public static final FeatureFlag AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG = new FeatureFlag("esql_aggregate_metric_double");
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ public enum DataType {
* loaded from the index and ESQL will load these fields as strings without their attached
* chunks or embeddings.
*/
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize());
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize()),

AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").unknownSize());

/**
* Types that are actively being built. These types are not returned
Expand All @@ -316,7 +318,8 @@ public enum DataType {
* check that sending them to a function produces a sane error message.
*/
public static final Map<DataType, FeatureFlag> UNDER_CONSTRUCTION = Map.ofEntries(
Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG)
Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG),
Map.entry(AGGREGATE_METRIC_DOUBLE, EsqlCorePlugin.AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG)
);

private final String typeName;
Expand Down Expand Up @@ -553,6 +556,7 @@ public static boolean isRepresentable(DataType t) {
&& t != SOURCE
&& t != HALF_FLOAT
&& t != PARTIAL_AGG
&& t != AGGREGATE_METRIC_DOUBLE
&& t.isCounter() == false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.index.mapper.BlockLoader;

public class AggregateDoubleMetricBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateDoubleMetricBuilder {

private final DoubleBlockBuilder minBuilder;
private final DoubleBlockBuilder maxBuilder;
private final DoubleBlockBuilder sumBuilder;
private final IntBlockBuilder countBuilder;

public AggregateDoubleMetricBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
minBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
maxBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
sumBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
countBuilder = new IntBlockBuilder(estimatedSize, blockFactory);
}

@Override
protected int valuesLength() {
return minBuilder.valuesLength();
}

@Override
protected void growValuesArray(int newSize) {
minBuilder.growValuesArray(newSize);
maxBuilder.growValuesArray(newSize);
sumBuilder.growValuesArray(newSize);
countBuilder.growValuesArray(newSize);
}

@Override
protected int elementSize() {
return minBuilder.elementSize() + maxBuilder.elementSize() + sumBuilder.elementSize() + countBuilder.elementSize();
}

@Override
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
CompositeBlock composite = (CompositeBlock) block;
minBuilder.copyFrom(composite.getBlock(Metric.MIN.ordinal()), beginInclusive, endExclusive);
maxBuilder.copyFrom(composite.getBlock(Metric.MAX.ordinal()), beginInclusive, endExclusive);
sumBuilder.copyFrom(composite.getBlock(Metric.SUM.ordinal()), beginInclusive, endExclusive);
countBuilder.copyFrom(composite.getBlock(Metric.COUNT.ordinal()), beginInclusive, endExclusive);
return this;
}

@Override
public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
minBuilder.mvOrdering(mvOrdering);
maxBuilder.mvOrdering(mvOrdering);
sumBuilder.mvOrdering(mvOrdering);
countBuilder.mvOrdering(mvOrdering);
return this;
}

@Override
public Block build() {
Block[] blocks = new Block[4];
blocks[Metric.MIN.ordinal()] = minBuilder.build();
blocks[Metric.MAX.ordinal()] = maxBuilder.build();
blocks[Metric.SUM.ordinal()] = sumBuilder.build();
blocks[Metric.COUNT.ordinal()] = countBuilder.build();
return new CompositeBlock(blocks);
}

@Override
public BlockLoader.AggregateDoubleMetricBuilder append(double min, double max, double sum, int valueCount) {
minBuilder.appendDouble(min);
maxBuilder.appendDouble(max);
sumBuilder.appendDouble(sum);
countBuilder.appendInt(valueCount);
return this;
}

public enum Metric {
MIN,
MAX,
SUM,
COUNT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ public Block newConstantNullBlock(int positions) {
return b;
}

public AggregateDoubleMetricBlockBuilder newAggregatedDoubleMetricBlockBuilder(int estimatedSize) {
return new AggregateDoubleMetricBlockBuilder(estimatedSize, this);
}

/**
* Returns the maximum number of bytes that a Block should be backed by a primitive array before switching to using BigArrays.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,30 @@ public int getPositionCount() {

@Override
public int getTotalValueCount() {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].getTotalValueCount();
}

@Override
public int getFirstValueIndex(int position) {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].getFirstValueIndex(position);
}

@Override
public int getValueCount(int position) {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].getValueCount(position);
}

@Override
public boolean isNull(int position) {
throw new UnsupportedOperationException("Composite block");
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
// So check just for one sub block is sufficient.
return blocks[0].isNull(position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,11 @@ public BytesRefBlock constantBytes(BytesRef value) {
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
return new SingletonOrdinalsBuilder(factory, ordinals, count);
}

@Override
public BlockLoader.AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count) {
return factory.newAggregatedDoubleMetricBlockBuilder(count);
}
}

// TODO tests that mix source loaded fields and doc values in the same block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,8 @@ public static Literal randomLiteral(DataType type) {
throw new UncheckedIOException(e);
}
}
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException(
"can't make random values for [" + type.typeName() + "]"
);
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we'll have to be able to make random AggregateMetricDoubles. But later is fine.

throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]");
}, type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,12 @@ public enum Cap {
/**
* Support named argument for function in map format.
*/
OPTIONAL_NAMED_ARGUMENT_MAP_FOR_FUNCTION(Build.current().isSnapshot());
OPTIONAL_NAMED_ARGUMENT_MAP_FOR_FUNCTION(Build.current().isSnapshot()),

/**
* Support for aggregate_metric_double type
*/
AGGREGATE_METRIC_DOUBLE;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa
return builder.value(versionToString(val));
}
};
case NULL -> new PositionToXContent(block) {
case NULL, AGGREGATE_METRIC_DOUBLE -> new PositionToXContent(block) {
@Override
protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR
case GEO_POINT, GEO_SHAPE, CARTESIAN_POINT, CARTESIAN_SHAPE -> spatialToString(
((BytesRefBlock) block).getBytesRef(offset, scratch)
);
case UNSUPPORTED -> (String) null;
case UNSUPPORTED, AGGREGATE_METRIC_DOUBLE -> (String) null;
case SOURCE -> {
BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
import org.elasticsearch.compute.data.AggregateDoubleMetricBlockBuilder;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
Expand All @@ -22,6 +23,7 @@
import org.elasticsearch.xpack.esql.expression.function.Example;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromAggregateDoubleMetric;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvCount;
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul;
Expand All @@ -33,6 +35,7 @@
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;

public class Count extends AggregateFunction implements ToAggregator, SurrogateExpression {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Count", Count::new);
Expand Down Expand Up @@ -71,6 +74,7 @@ public Count(
optional = true,
name = "field",
type = {
"aggregate_metric_double",
"boolean",
"cartesian_point",
"date",
Expand Down Expand Up @@ -141,6 +145,16 @@ protected TypeResolution resolveType() {
public Expression surrogate() {
var s = source();
var field = field();
if (field.dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
return new Sum(
s,
new FromAggregateDoubleMetric(
source(),
field,
new Literal(s, AggregateDoubleMetricBlockBuilder.Metric.COUNT.ordinal(), INTEGER)
)
);
}

if (field.foldable()) {
if (field instanceof Literal l) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.compute.aggregation.MaxIntAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.MaxIpAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.MaxLongAggregatorFunctionSupplier;
import org.elasticsearch.compute.data.AggregateDoubleMetricBlockBuilder;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.xpack.esql.expression.function.Example;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromAggregateDoubleMetric;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMax;
import org.elasticsearch.xpack.esql.planner.ToAggregator;

Expand Down Expand Up @@ -73,7 +75,19 @@ public Max(
Source source,
@Param(
name = "field",
type = { "boolean", "double", "integer", "long", "date", "date_nanos", "ip", "keyword", "text", "long", "version" }
type = {
"aggregate_metric_double",
"boolean",
"double",
"integer",
"long",
"date",
"date_nanos",
"ip",
"keyword",
"text",
"long",
"version" }
) Expression field
) {
this(source, field, Literal.TRUE);
Expand Down Expand Up @@ -111,7 +125,7 @@ public Max replaceChildren(List<Expression> newChildren) {
protected TypeResolution resolveType() {
return TypeResolutions.isType(
field(),
SUPPLIERS::containsKey,
dt -> SUPPLIERS.containsKey(dt) || dt == DataType.AGGREGATE_METRIC_DOUBLE,
sourceText(),
DEFAULT,
"representable except unsigned_long and spatial types"
Expand All @@ -120,6 +134,9 @@ protected TypeResolution resolveType() {

@Override
public DataType dataType() {
if (field().dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
return DataType.DOUBLE;
}
return field().dataType().noText();
}

Expand All @@ -135,6 +152,16 @@ public final AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {

@Override
public Expression surrogate() {
if (field().dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
return new Max(
source(),
new FromAggregateDoubleMetric(
source(),
field(),
new Literal(source(), AggregateDoubleMetricBlockBuilder.Metric.MAX.ordinal(), DataType.INTEGER)
)
);
}
return field().foldable() ? new MvMax(source(), field()) : null;
}
}
Loading