diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 7b4ceb67f04d7..ed40aab44cb5f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -417,6 +417,8 @@ interface BlockFactory { SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count); // TODO support non-singleton ords + + AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count); } /** @@ -501,4 +503,10 @@ interface SingletonOrdinalsBuilder extends Builder { */ SingletonOrdinalsBuilder appendOrd(int value); } + + interface AggregateDoubleMetricBuilder extends Builder { + + AggregateDoubleMetricBuilder append(double min, double max, double sum, int valueCount); + + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java index 2c53fa782db85..8fca80d459de4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java @@ -147,6 +147,11 @@ public SingletonOrdsBuilder appendOrd(int value) { } return new SingletonOrdsBuilder(); } + + @Override + public BlockLoader.AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count) { + return null; + } }; } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java index 61b480968e974..729188e2981d9 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java @@ -14,4 +14,5 @@ public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin { public static final FeatureFlag SEMANTIC_TEXT_FEATURE_FLAG = new FeatureFlag("esql_semantic_text"); + public static final FeatureFlag AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG = new FeatureFlag("esql_aggregate_metric_double"); } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index d86cdb0de038c..cc41763cbbba3 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -307,7 +307,9 @@ public enum DataType { * loaded from the index and ESQL will load these fields as strings without their attached * chunks or embeddings. */ - SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize()); + SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize()), + + AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").unknownSize()); /** * Types that are actively being built. These types are not returned @@ -316,7 +318,8 @@ public enum DataType { * check that sending them to a function produces a sane error message. */ public static final Map UNDER_CONSTRUCTION = Map.ofEntries( - Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG) + Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG), + Map.entry(AGGREGATE_METRIC_DOUBLE, EsqlCorePlugin.AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG) ); private final String typeName; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxAggregatedMetricDoubleAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxAggregatedMetricDoubleAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..75927a5f5033e --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxAggregatedMetricDoubleAggregatorFunctionSupplier.java @@ -0,0 +1,199 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.CompositeBlock; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.DoubleVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; + +import java.util.List; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link MaxDoubleAggregator} with aggregated double metric field. + */ +public final class MaxAggregatedMetricDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + + private static final int MAX_COMPOSITE_SUBBLOCK = 1; + + private final List channels; + + public MaxAggregatedMetricDoubleAggregatorFunctionSupplier(List channels) { + this.channels = channels; + } + + @Override + public AggregatorFunction aggregator(DriverContext driverContext) { + // Copied from MaxDoubleAggregatorFunction, see change comments for actual differences: + final DoubleState state = new DoubleState(MaxDoubleAggregator.init()); + return new AggregatorFunction() { + + static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("max", ElementType.DOUBLE), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) + ); + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + return; + } + if (mask.allTrue()) { + // No masking + // CHANGE: + CompositeBlock compositeBlock = page.getBlock(channels.get(0)); + DoubleBlock block = compositeBlock.getBlock(MAX_COMPOSITE_SUBBLOCK); + // END CHANGE: + DoubleVector vector = block.asVector(); + if (vector != null) { + addRawVector(vector); + } else { + addRawBlock(block); + } + return; + } + // Some positions masked away, others kept + // CHANGE: + CompositeBlock compositeBlock = page.getBlock(channels.get(0)); + DoubleBlock block = compositeBlock.getBlock(MAX_COMPOSITE_SUBBLOCK); + // END CHANGE: + DoubleVector vector = block.asVector(); + if (vector != null) { + addRawVector(vector, mask); + } else { + addRawBlock(block, mask); + } + } + + private void addRawVector(DoubleVector vector) { + state.seen(true); + for (int i = 0; i < vector.getPositionCount(); i++) { + state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), vector.getDouble(i))); + } + } + + private void addRawVector(DoubleVector vector, BooleanVector mask) { + state.seen(true); + for (int i = 0; i < vector.getPositionCount(); i++) { + if (mask.getBoolean(i) == false) { + continue; + } + state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), vector.getDouble(i))); + } + } + + private void addRawBlock(DoubleBlock block) { + for (int p = 0; p < block.getPositionCount(); p++) { + if (block.isNull(p)) { + continue; + } + state.seen(true); + int start = block.getFirstValueIndex(p); + int end = start + block.getValueCount(p); + for (int i = start; i < end; i++) { + state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), block.getDouble(i))); + } + } + } + + private void addRawBlock(DoubleBlock block, BooleanVector mask) { + for (int p = 0; p < block.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + if (block.isNull(p)) { + continue; + } + state.seen(true); + int start = block.getFirstValueIndex(p); + int end = start + block.getValueCount(p); + for (int i = start; i < end; i++) { + state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), block.getDouble(i))); + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block maxUncast = page.getBlock(channels.get(0)); + if (maxUncast.areAllValuesNull()) { + return; + } + DoubleVector max = ((DoubleBlock) maxUncast).asVector(); + assert max.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(1)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + if (seen.getBoolean(0)) { + state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), max.getDouble(0))); + state.seen(true); + } + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + if (state.seen() == false) { + blocks[offset] = driverContext.blockFactory().newConstantNullBlock(1); + return; + } + blocks[offset] = driverContext.blockFactory().newConstantDoubleBlockWith(state.doubleValue(), 1); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } + + }; + } + + @Override + public MaxDoubleGroupingAggregatorFunction groupingAggregator(DriverContext driverContext) { + // TODO: + throw new UnsupportedOperationException("grouping aggregator is not supported yet"); + // return MaxDoubleGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return "max of aggregated doubles"; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateDoubleMetricBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateDoubleMetricBlockBuilder.java new file mode 100644 index 0000000000000..f4a0d1e24f353 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateDoubleMetricBlockBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.index.mapper.BlockLoader; + +public class AggregateDoubleMetricBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateDoubleMetricBuilder { + + private final DoubleBlockBuilder minBuilder; + private final DoubleBlockBuilder maxBuilder; + private final DoubleBlockBuilder sumBuilder; + private final IntBlockBuilder countBuilder; + + public AggregateDoubleMetricBlockBuilder(int estimatedSize, BlockFactory blockFactory) { + super(blockFactory); + minBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory); + maxBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory); + sumBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory); + countBuilder = new IntBlockBuilder(estimatedSize, blockFactory); + } + + @Override + protected int valuesLength() { + return minBuilder.valuesLength(); + } + + @Override + protected void growValuesArray(int newSize) { + minBuilder.growValuesArray(newSize); + maxBuilder.growValuesArray(newSize); + sumBuilder.growValuesArray(newSize); + countBuilder.growValuesArray(newSize); + } + + @Override + protected int elementSize() { + return minBuilder.elementSize() + maxBuilder.elementSize() + sumBuilder.elementSize() + countBuilder.elementSize(); + } + + @Override + public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) { + CompositeBlock composite = (CompositeBlock) block; + minBuilder.copyFrom(composite.getBlock(0), beginInclusive, endExclusive); + maxBuilder.copyFrom(composite.getBlock(1), beginInclusive, endExclusive); + sumBuilder.copyFrom(composite.getBlock(2), beginInclusive, endExclusive); + countBuilder.copyFrom(composite.getBlock(3), beginInclusive, endExclusive); + return this; + } + + @Override + public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { + minBuilder.mvOrdering(mvOrdering); + maxBuilder.mvOrdering(mvOrdering); + sumBuilder.mvOrdering(mvOrdering); + countBuilder.mvOrdering(mvOrdering); + return this; + } + + @Override + public Block build() { + Block[] blocks = new Block[4]; + blocks[0] = minBuilder.build(); + blocks[1] = maxBuilder.build(); + blocks[2] = sumBuilder.build(); + blocks[3] = countBuilder.build(); + return new CompositeBlock(blocks); + } + + @Override + public BlockLoader.AggregateDoubleMetricBuilder append(double min, double max, double sum, int valueCount) { + minBuilder.appendDouble(min); + maxBuilder.appendDouble(max); + sumBuilder.appendDouble(sum); + countBuilder.appendInt(valueCount); + return this; + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index 155898ebdc6c8..623305dfe75c8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -432,6 +432,10 @@ public Block newConstantNullBlock(int positions) { return b; } + public AggregateDoubleMetricBlockBuilder newAggregatedDoubleMetricBlockBuilder(int estimatedSize) { + return new AggregateDoubleMetricBlockBuilder(estimatedSize, this); + } + /** * Returns the maximum number of bytes that a Block should be backed by a primitive array before switching to using BigArrays. */ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index 3df389135e9d3..8ec7581834777 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -268,6 +268,8 @@ private static Object valueAtOffset(Block block, int offset) { case BOOLEAN -> ((BooleanBlock) block).getBoolean(offset); case BYTES_REF -> BytesRef.deepCopyOf(((BytesRefBlock) block).getBytesRef(offset, new BytesRef())); case DOUBLE -> ((DoubleBlock) block).getDouble(offset); + // TODO: include all aggregate double metric sub fields. Currently hardcoded to max sub field, which is always the second block: + case AGGREGATED_DOUBLE_METRIC -> ((DoubleBlock) ((CompositeBlock) block).getBlock(1)).getDouble(offset); case FLOAT -> ((FloatBlock) block).getFloat(offset); case INT -> ((IntBlock) block).getInt(offset); case LONG -> ((LongBlock) block).getLong(offset); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java index b83e2d1efc259..3836680e95751 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java @@ -86,22 +86,30 @@ public int getPositionCount() { @Override public int getTotalValueCount() { - throw new UnsupportedOperationException("Composite block"); + // TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions. + // So check just for one sub block is sufficient. + return blocks[0].getTotalValueCount(); } @Override public int getFirstValueIndex(int position) { - throw new UnsupportedOperationException("Composite block"); + // TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions. + // So check just for one sub block is sufficient. + return blocks[0].getFirstValueIndex(position); } @Override public int getValueCount(int position) { - throw new UnsupportedOperationException("Composite block"); + // TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions. + // So check just for one sub block is sufficient. + return blocks[0].getValueCount(position); } @Override public boolean isNull(int position) { - throw new UnsupportedOperationException("Composite block"); + // TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions. + // So check just for one sub block is sufficient. + return blocks[0].isNull(position); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java index f38c6d70991f9..0ef485eadff77 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java @@ -18,6 +18,7 @@ public enum ElementType { LONG("Long", BlockFactory::newLongBlockBuilder), FLOAT("Float", BlockFactory::newFloatBlockBuilder), DOUBLE("Double", BlockFactory::newDoubleBlockBuilder), + AGGREGATED_DOUBLE_METRIC("AggregatedDoubleMetric", BlockFactory::newAggregatedDoubleMetricBlockBuilder), /** * Blocks containing only null values. */ @@ -33,7 +34,7 @@ public enum ElementType { /** * Composite blocks which contain array of sub-blocks. */ - COMPOSITE("Composite", (blockFactory, estimatedSize) -> { throw new UnsupportedOperationException("can't build composite blocks"); }), + COMPOSITE("Composite", BlockFactory::newAggregatedDoubleMetricBlockBuilder), /** * Intermediate blocks which don't support retrieving elements. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 74affb10eaf20..1fa2e75753995 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.AggregateDoubleMetricBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; @@ -160,7 +161,9 @@ public int get(int i) { } if (Assertions.ENABLED) { for (int f = 0; f < fields.length; f++) { - assert blocks[f].elementType() == ElementType.NULL || blocks[f].elementType() == fields[f].info.type + assert blocks[f].elementType() == ElementType.NULL + || blocks[f].elementType() == fields[f].info.type + || blocks[f].elementType() == ElementType.COMPOSITE && fields[f].info.type == ElementType.AGGREGATED_DOUBLE_METRIC : blocks[f].elementType() + " NOT IN (NULL, " + fields[f].info.type + ")"; } } @@ -698,6 +701,11 @@ public BytesRefBlock constantBytes(BytesRef value) { public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) { return new SingletonOrdinalsBuilder(factory, ordinals, count); } + + @Override + public BlockLoader.AggregateDoubleMetricBuilder aggregateDoubleMetricBuilder(int count) { + return new AggregateDoubleMetricBlockBuilder(count, this.factory); + } } // TODO tests that mix source loaded fields and doc values in the same block diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java index 1e9ea88b2f1d7..7e5732162b3fe 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java @@ -147,7 +147,9 @@ static List valuesAggregatorForGroupings(List new org.elasticsearch.compute.aggregation.ValuesIntAggregatorFunctionSupplier(channels); case LONG -> new org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier(channels); case BOOLEAN -> new org.elasticsearch.compute.aggregation.ValuesBooleanAggregatorFunctionSupplier(channels); - case FLOAT, NULL, DOC, COMPOSITE, UNKNOWN -> throw new IllegalArgumentException("unsupported grouping type"); + case AGGREGATED_DOUBLE_METRIC, FLOAT, NULL, DOC, COMPOSITE, UNKNOWN -> throw new IllegalArgumentException( + "unsupported grouping type" + ); }); aggregators.add(aggregatorSupplier.groupingAggregatorFactory(AggregatorMode.SINGLE)); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 5428863436535..266bfa7d00f4e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -90,7 +90,7 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC } case NULL -> offset -> null; case DOC -> throw new IllegalArgumentException("can't read values from [doc] block"); - case COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block"); + case AGGREGATED_DOUBLE_METRIC, COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block"); case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; return new TermQueryList(field, searchExecutionContext, block, blockToJavaObject); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java index 61c49bac7505d..0e0b687ddb8da 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java @@ -52,6 +52,7 @@ static ResultBuilder resultBuilderFor( case LONG -> new ResultBuilderForLong(blockFactory, encoder, inKey, positions); case FLOAT -> new ResultBuilderForFloat(blockFactory, encoder, inKey, positions); case DOUBLE -> new ResultBuilderForDouble(blockFactory, encoder, inKey, positions); + case AGGREGATED_DOUBLE_METRIC -> new ResultBuilderForDouble(blockFactory, encoder, inKey, positions); case NULL -> new ResultBuilderForNull(blockFactory); case DOC -> new ResultBuilderForDoc(blockFactory, positions); default -> { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java index b9336024eb404..39a131ec56d1f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java @@ -10,6 +10,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; @@ -25,7 +26,9 @@ interface ValueExtractor { void writeValue(BreakingBytesRefBuilder values, int position); static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, boolean inKey, Block block) { - if (false == (elementType == block.elementType() || ElementType.NULL == block.elementType())) { + if (false == (elementType == block.elementType() + || ElementType.NULL == block.elementType() + || elementType == ElementType.AGGREGATED_DOUBLE_METRIC && block.elementType() == ElementType.COMPOSITE)) { throw new IllegalArgumentException("Expected [" + elementType + "] but was [" + block.elementType() + "]"); } return switch (block.elementType()) { @@ -37,6 +40,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, case DOUBLE -> ValueExtractorForDouble.extractorFor(encoder, inKey, (DoubleBlock) block); case NULL -> new ValueExtractorForNull(); case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector()); + case COMPOSITE -> ValueExtractorForComposite.extractorFor(encoder, inKey, (CompositeBlock) block); default -> { assert false : "No value extractor for [" + block.elementType() + "]"; throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java new file mode 100644 index 0000000000000..3f597e50c19f3 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.topn; + +import org.elasticsearch.compute.data.CompositeBlock; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; + +abstract class ValueExtractorForComposite implements ValueExtractor { + static ValueExtractorForComposite extractorFor(TopNEncoder encoder, boolean inKey, CompositeBlock block) { + // todo: implement ForVector? + return new ValueExtractorForComposite.ForBlock(encoder, inKey, block); + } + + protected final boolean inKey; + + ValueExtractorForComposite(TopNEncoder encoder, boolean inKey) { + assert encoder == TopNEncoder.DEFAULT_UNSORTABLE : encoder.toString(); + this.inKey = inKey; + } + + protected final void writeCount(BreakingBytesRefBuilder values, int count) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(count, values); + } + + protected final void actualWriteValue(BreakingBytesRefBuilder values, double value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, values); + } + + static class ForBlock extends ValueExtractorForComposite { + private final CompositeBlock block; + + ForBlock(TopNEncoder encoder, boolean inKey, CompositeBlock block) { + super(encoder, inKey); + this.block = block; + } + + @Override + public void writeValue(BreakingBytesRefBuilder values, int position) { + int size = block.getBlock(1).getValueCount(position); + writeCount(values, size); + if (size == 1 && inKey) { + // Will read results from the key + return; + } + // TODO: include all aggregate double metric sub fields. Currently hardcoded to max sub field, which is always the second block: + int start = block.getBlock(1).getFirstValueIndex(position); + int end = start + size; + for (int i = start; i < end; i++) { + actualWriteValue(values, ((DoubleBlock) block.getBlock(1)).getDouble(i)); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockTestUtils.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockTestUtils.java index 55e80a9124de0..62eb214f9537d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockTestUtils.java @@ -39,6 +39,7 @@ public static Object randomValue(ElementType e) { case DOC -> new BlockUtils.Doc(randomInt(), randomInt(), between(0, Integer.MAX_VALUE)); case NULL -> null; case COMPOSITE -> throw new IllegalArgumentException("can't make random values for composite"); + case AGGREGATED_DOUBLE_METRIC -> throw new IllegalArgumentException("can't make random values for aggregate double metric"); case UNKNOWN -> throw new IllegalArgumentException("can't make random values for [" + e + "]"); }; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java index 3ab02ac5488bc..30dc3279559ef 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java @@ -113,7 +113,7 @@ public void testCranky() { private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactory) { return switch (elementType) { - case NULL, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATED_DOUBLE_METRIC, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorBuilder(estimatedSize); case BYTES_REF -> blockFactory.newBytesRefVectorBuilder(estimatedSize); case FLOAT -> blockFactory.newFloatVectorBuilder(estimatedSize); @@ -125,7 +125,7 @@ private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactor private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATED_DOUBLE_METRIC, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.Builder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java index 1086280af9df0..46b2329299650 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java @@ -116,7 +116,7 @@ public void testCranky() { private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { return switch (elementType) { - case NULL, BYTES_REF, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, BYTES_REF, DOC, COMPOSITE, AGGREGATED_DOUBLE_METRIC, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorFixedBuilder(size); case DOUBLE -> blockFactory.newDoubleVectorFixedBuilder(size); case FLOAT -> blockFactory.newFloatVectorFixedBuilder(size); @@ -127,7 +127,7 @@ private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATED_DOUBLE_METRIC, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.FixedBuilder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java index 692c385cef216..3dc9e8847a7a3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvAssert.java @@ -135,6 +135,9 @@ private static void assertMetadata( || expectedType == UNSIGNED_LONG)) { continue; } + if (blockType == Type.DOUBLE && expectedType == Type.AGGREGATE_METRIC_DOUBLE) { + continue; + } if (blockType == Type.KEYWORD && (expectedType == Type.IP || expectedType == Type.VERSION diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index f0bdf089f69d1..a002afa8bcae5 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -478,7 +478,8 @@ public enum Type { GEO_POINT(x -> x == null ? null : GEO.wktToWkb(x), BytesRef.class), CARTESIAN_POINT(x -> x == null ? null : CARTESIAN.wktToWkb(x), BytesRef.class), GEO_SHAPE(x -> x == null ? null : GEO.wktToWkb(x), BytesRef.class), - CARTESIAN_SHAPE(x -> x == null ? null : CARTESIAN.wktToWkb(x), BytesRef.class); + CARTESIAN_SHAPE(x -> x == null ? null : CARTESIAN.wktToWkb(x), BytesRef.class), + AGGREGATE_METRIC_DOUBLE(Double::parseDouble, Double.class); private static final Map LOOKUP = new HashMap<>(); @@ -546,6 +547,7 @@ public static Type asType(ElementType elementType, Type actualType) { case BOOLEAN -> BOOLEAN; case DOC -> throw new IllegalArgumentException("can't assert on doc blocks"); case COMPOSITE -> throw new IllegalArgumentException("can't assert on composite blocks"); + case AGGREGATED_DOUBLE_METRIC -> throw new IllegalArgumentException("can't assert on aggregated double metric blocks"); case UNKNOWN -> throw new IllegalArgumentException("Unknown block types cannot be handled"); }; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 7aca63182e2b1..60cfde998c622 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -111,6 +111,7 @@ public class CsvTestsDataLoader { private static final TestsDataset ADDRESSES = new TestsDataset("addresses"); private static final TestsDataset BOOKS = new TestsDataset("books").withSetting("books-settings.json"); private static final TestsDataset SEMANTIC_TEXT = new TestsDataset("semantic_text").withInferenceEndpoint(true); + private static final TestsDataset AGGREGATE_METRIC_DOUBLE = new TestsDataset("aggregate_metric_double"); public static final Map CSV_DATASET_MAP = Map.ofEntries( Map.entry(EMPLOYEES.indexName, EMPLOYEES), @@ -155,7 +156,8 @@ public class CsvTestsDataLoader { Map.entry(DISTANCES.indexName, DISTANCES), Map.entry(ADDRESSES.indexName, ADDRESSES), Map.entry(BOOKS.indexName, BOOKS), - Map.entry(SEMANTIC_TEXT.indexName, SEMANTIC_TEXT) + Map.entry(SEMANTIC_TEXT.indexName, SEMANTIC_TEXT), + Map.entry(AGGREGATE_METRIC_DOUBLE.indexName, AGGREGATE_METRIC_DOUBLE) ); private static final EnrichConfig LANGUAGES_ENRICH = new EnrichConfig("languages_policy", "enrich-policy-languages.json"); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 66fd7d3ee5eb5..668dcb081aad2 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -126,6 +126,7 @@ import static org.elasticsearch.test.ESTestCase.randomShort; import static org.elasticsearch.test.ESTestCase.randomZone; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; +import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER; import static org.elasticsearch.xpack.esql.core.type.DataType.NULL; import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.CARTESIAN; @@ -749,6 +750,7 @@ public static Literal randomLiteral(DataType type) { case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint()); case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean())); case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean())); + case AGGREGATE_METRIC_DOUBLE -> randomDouble(); case NULL -> null; case SOURCE -> { try { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/aggregate_metric_double.csv b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/aggregate_metric_double.csv new file mode 100644 index 0000000000000..27c45b9c4eb59 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/aggregate_metric_double.csv @@ -0,0 +1,2 @@ +some_field:aggregate_metric_double + diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-aggregate_metric_double.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-aggregate_metric_double.json new file mode 100644 index 0000000000000..865b22ebcd9c3 --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-aggregate_metric_double.json @@ -0,0 +1,9 @@ +{ + "properties": { + "some_field": { + "type": "aggregate_metric_double", + "metrics": [ "min", "max", "sum", "value_count" ], + "default_metric": "max" + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index b1b11ccb09c86..d5d9b3d0515a5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -605,7 +605,9 @@ public enum Cap { /** * Full text functions can be used in disjunctions */ - FULL_TEXT_FUNCTIONS_DISJUNCTIONS; + FULL_TEXT_FUNCTIONS_DISJUNCTIONS, + + AGGREGATE_METRIC_DOUBLE; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java index 0def56c70dc35..a4e0fa91dca99 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java @@ -13,6 +13,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -81,6 +82,20 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa return builder.value(((DoubleBlock) block).getDouble(valueIndex)); } }; + case AGGREGATE_METRIC_DOUBLE -> new PositionToXContent(block) { + @Override + protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex) + throws IOException { + // TODO: remove if statement. The block should always be of type CompositeBlock + DoubleBlock doubleBlock; + if (block instanceof CompositeBlock compositeBlock) { + doubleBlock = compositeBlock.getBlock(1); + } else { + doubleBlock = (DoubleBlock) block; + } + return builder.value(doubleBlock.getDouble(valueIndex)); + } + }; case UNSIGNED_LONG -> new PositionToXContent(block) { @Override protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java index 49fcc167dce0f..b67c146a22ae6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java @@ -14,6 +14,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -114,6 +115,7 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR case LONG, COUNTER_LONG -> ((LongBlock) block).getLong(offset); case INTEGER, COUNTER_INTEGER -> ((IntBlock) block).getInt(offset); case DOUBLE, COUNTER_DOUBLE -> ((DoubleBlock) block).getDouble(offset); + case AGGREGATE_METRIC_DOUBLE -> ((DoubleBlock) ((CompositeBlock) block).getBlock(1)).getDouble(offset); case KEYWORD, SEMANTIC_TEXT, TEXT -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString(); case IP -> { BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java index eb0c8abd1080b..19ac278a04c3a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.MaxAggregatedMetricDoubleAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.MaxBooleanAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.MaxBytesRefAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.MaxDoubleAggregatorFunctionSupplier; @@ -48,6 +49,7 @@ public class Max extends AggregateFunction implements ToAggregator, SurrogateExp Map.entry(DataType.DATE_NANOS, MaxLongAggregatorFunctionSupplier::new), Map.entry(DataType.INTEGER, MaxIntAggregatorFunctionSupplier::new), Map.entry(DataType.DOUBLE, MaxDoubleAggregatorFunctionSupplier::new), + Map.entry(DataType.AGGREGATE_METRIC_DOUBLE, MaxAggregatedMetricDoubleAggregatorFunctionSupplier::new), Map.entry(DataType.IP, MaxIpAggregatorFunctionSupplier::new), Map.entry(DataType.KEYWORD, MaxBytesRefAggregatorFunctionSupplier::new), Map.entry(DataType.TEXT, MaxBytesRefAggregatorFunctionSupplier::new), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java index 1918e3036e2b0..45a2ceb83219c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java @@ -304,6 +304,7 @@ private static DataType toDataType(ElementType elementType) { case INT -> DataType.INTEGER; case LONG -> DataType.LONG; case DOUBLE -> DataType.DOUBLE; + case AGGREGATED_DOUBLE_METRIC -> DataType.AGGREGATE_METRIC_DOUBLE; case FLOAT, NULL, DOC, COMPOSITE, UNKNOWN -> throw new EsqlIllegalArgumentException("unsupported agg type: " + elementType); }; } @@ -327,7 +328,7 @@ private static String dataTypeToString(DataType type, Class aggClass) { case DataType.BOOLEAN -> "Boolean"; case DataType.INTEGER, DataType.COUNTER_INTEGER -> "Int"; case DataType.LONG, DataType.DATETIME, DataType.COUNTER_LONG, DataType.DATE_NANOS -> "Long"; - case DataType.DOUBLE, DataType.COUNTER_DOUBLE -> "Double"; + case DataType.DOUBLE, DataType.AGGREGATE_METRIC_DOUBLE, DataType.COUNTER_DOUBLE -> "Double"; case DataType.KEYWORD, DataType.IP, DataType.VERSION, DataType.TEXT, DataType.SEMANTIC_TEXT -> "BytesRef"; case GEO_POINT -> "GeoPoint"; case CARTESIAN_POINT -> "CartesianPoint"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index dedc612071434..6d840667daf63 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -358,8 +358,8 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte case VERSION -> TopNEncoder.VERSION; case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION, OBJECT, SCALED_FLOAT, UNSIGNED_LONG, DOC_DATA_TYPE, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE; - case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE -> - TopNEncoder.DEFAULT_UNSORTABLE; + case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE, + AGGREGATE_METRIC_DOUBLE -> TopNEncoder.DEFAULT_UNSORTABLE; // unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED; }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index a312d048db0ad..531da800a7440 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -264,6 +264,7 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field case LONG, DATETIME, DATE_NANOS, UNSIGNED_LONG, COUNTER_LONG -> ElementType.LONG; case INTEGER, COUNTER_INTEGER -> ElementType.INT; case DOUBLE, COUNTER_DOUBLE -> ElementType.DOUBLE; + case AGGREGATE_METRIC_DOUBLE -> ElementType.AGGREGATED_DOUBLE_METRIC; // unsupported fields are passed through as a BytesRef case KEYWORD, TEXT, IP, SOURCE, VERSION, SEMANTIC_TEXT, UNSUPPORTED -> ElementType.BYTES_REF; case NULL -> ElementType.NULL; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 2deedb927331d..f7dd249ebdf9a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; +import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.transport.RemoteClusterAware; @@ -214,6 +215,12 @@ private Page randomPage(List columns) { case CARTESIAN_SHAPE -> ((BytesRefBlock.Builder) builder).appendBytesRef( CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean())) ); + case AGGREGATE_METRIC_DOUBLE -> ((BlockLoader.AggregateDoubleMetricBuilder) builder).append( + randomDouble(), + randomDouble(), + randomDouble(), + randomInt() + ); case NULL -> builder.appendNull(); case SOURCE -> { try { @@ -870,6 +877,12 @@ static Page valuesToPage(BlockFactory blockFactory, List columns case LONG, COUNTER_LONG -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue()); case INTEGER, COUNTER_INTEGER -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue()); case DOUBLE, COUNTER_DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue()); + case AGGREGATE_METRIC_DOUBLE -> ((BlockLoader.AggregateDoubleMetricBuilder) builder).append( + ((Number) value).doubleValue(), + ((Number) value).doubleValue(), + ((Number) value).doubleValue(), + ((Number) value).intValue() + ); case KEYWORD, TEXT, SEMANTIC_TEXT -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(value.toString())); case UNSUPPORTED -> ((BytesRefBlock.Builder) builder).appendNull(); case IP -> ((BytesRefBlock.Builder) builder).appendBytesRef(stringToIP(value.toString())); diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java index 6944f91042311..efb1043be6ede 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java @@ -10,6 +10,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.Query; import org.apache.lucene.search.SortField; @@ -27,6 +28,8 @@ import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.mapper.BlockDocValuesReader; +import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader; import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.mapper.FieldMapper; @@ -288,7 +291,7 @@ public AggregateDoubleMetricFieldType(String name) { } public AggregateDoubleMetricFieldType(String name, Map meta, MetricType metricType) { - super(name, true, false, false, TextSearchInfo.SIMPLE_MATCH_WITHOUT_TERMS, meta); + super(name, true, false, true, TextSearchInfo.SIMPLE_MATCH_WITHOUT_TERMS, meta); this.metricType = metricType; } @@ -508,6 +511,115 @@ public ValueFetcher valueFetcher(SearchExecutionContext context, String format) return SourceValueFetcher.identity(name(), context, format); } + @Override + public BlockLoader blockLoader(BlockLoaderContext blContext) { + var minFieldType = metricFields.get(Metric.min); + var maxFieldType = metricFields.get(Metric.max); + var sumFieldType = metricFields.get(Metric.sum); + var countFieldType = metricFields.get(Metric.value_count); + return new BlockDocValuesReader.DocValuesBlockLoader() { + + static NumericDocValues getNumericDocValues(NumberFieldMapper.NumberFieldType field, LeafReader leafReader) + throws IOException { + if (field == null) { + return null; + } + String fieldName = field.name(); + var values = leafReader.getNumericDocValues(fieldName); + if (values != null) { + return values; + } + + var sortedValues = leafReader.getSortedNumericDocValues(fieldName); + return DocValues.unwrapSingleton(sortedValues); + } + + @Override + public AllReader reader(LeafReaderContext context) throws IOException { + NumericDocValues minValues = getNumericDocValues(minFieldType, context.reader()); + NumericDocValues maxValues = getNumericDocValues(maxFieldType, context.reader()); + NumericDocValues sumValues = getNumericDocValues(sumFieldType, context.reader()); + NumericDocValues valueCountValues = getNumericDocValues(countFieldType, context.reader()); + + // TODO: We only support having all subfields; if any are missing it should fail, for now + if (minValues == null || maxValues == null || sumValues == null || valueCountValues == null) { + return new ConstantNullsReader(); + } + return new BlockDocValuesReader() { + + private int docID = -1; + + @Override + protected int docId() { + return docID; + } + + @Override + public String toString() { + return "BlockDocValuesReader.AggregatedDoubleMetrics"; + } + + @Override + public Block read(BlockFactory factory, Docs docs) throws IOException { + try (var builder = factory.aggregateDoubleMetricBuilder(docs.count())) { + int lastDoc = -1; + for (int i = 0; i < docs.count(); i++) { + int doc = docs.get(i); + if (doc < lastDoc) { + throw new IllegalStateException("docs within same block must be in order"); + } + if (minValues.advanceExact(doc)) { + boolean found = maxValues.advanceExact(doc); + assert found; + found = sumValues.advanceExact(doc); + assert found; + found = valueCountValues.advanceExact(doc); + assert found; + double min = NumericUtils.sortableLongToDouble(minValues.longValue()); + double max = NumericUtils.sortableLongToDouble(maxValues.longValue()); + double sum = NumericUtils.sortableLongToDouble(sumValues.longValue()); + int count = Math.toIntExact(valueCountValues.longValue()); + builder.append(min, max, sum, count); + } else { + builder.appendNull(); + } + lastDoc = doc; + this.docID = doc; + } + return builder.build(); + } + } + + @Override + public void read(int docId, StoredFields storedFields, Builder builder) throws IOException { + this.docID = docId; + var blockBuilder = (AggregateDoubleMetricBuilder) builder; + if (minValues.advanceExact(this.docID)) { + boolean found = maxValues.advanceExact(this.docID); + assert found; + found = sumValues.advanceExact(this.docID); + assert found; + found = valueCountValues.advanceExact(this.docID); + assert found; + double min = NumericUtils.sortableLongToDouble(minValues.longValue()); + double max = NumericUtils.sortableLongToDouble(maxValues.longValue()); + double sum = NumericUtils.sortableLongToDouble(sumValues.longValue()); + int count = Math.toIntExact(valueCountValues.longValue()); + blockBuilder.append(min, max, sum, count); + } else { + blockBuilder.appendNull(); + } + } + }; + } + + @Override + public Builder builder(BlockFactory factory, int expectedCount) { + return factory.aggregateDoubleMetricBuilder(expectedCount); + } + }; + } + /** * If field is a time series metric field, returns its metric type * @return the metric type or null diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java index 72c2beeed3ba4..356ac95b4fb76 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java @@ -36,6 +36,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Names.IGNORE_MALFORMED; import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Names.METRICS; @@ -566,17 +567,19 @@ public void testArrayValueSyntheticSource() throws Exception { protected final class AggregateDoubleMetricSyntheticSourceSupport implements SyntheticSourceSupport { private final boolean malformedExample; private final EnumSet storedMetrics; + private final Metric defaultMetric; public AggregateDoubleMetricSyntheticSourceSupport(boolean malformedExample) { this.malformedExample = malformedExample; this.storedMetrics = EnumSet.copyOf(randomNonEmptySubsetOf(Arrays.asList(Metric.values()))); + this.defaultMetric = storedMetrics.iterator().next(); } @Override public SyntheticSourceExample example(int maxVals) { // aggregate_metric_double field does not support arrays - Object value = randomAggregateMetric(); - return new SyntheticSourceExample(value, value, this::mapping); + Map value = randomAggregateMetric(); + return new SyntheticSourceExample(value, value, value.get(defaultMetric.name()), this::mapping); } private Map randomAggregateMetric() { @@ -597,7 +600,7 @@ private Map randomAggregateMetric() { private void mapping(XContentBuilder b) throws IOException { String[] metrics = storedMetrics.stream().map(Metric::toString).toArray(String[]::new); - b.field("type", CONTENT_TYPE).array(METRICS_FIELD, metrics).field(DEFAULT_METRIC, metrics[0]); + b.field("type", CONTENT_TYPE).array(METRICS_FIELD, metrics).field(DEFAULT_METRIC, defaultMetric.name()); if (malformedExample) { b.field(IGNORE_MALFORMED, true); } @@ -618,4 +621,9 @@ public void testSyntheticSourceKeepArrays() { protected boolean supportsCopyTo() { return false; } + + @Override + protected Function loadBlockExpected() { + return n -> ((Number) n); + } } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml index ebf464ba667db..45543f84ac28f 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml @@ -82,8 +82,7 @@ setup: time_series_dimension: true agg_metric: type: aggregate_metric_double - metrics: - - max + metrics: [ max, min, sum, value_count] default_metric: max k8s: properties: @@ -101,9 +100,9 @@ setup: index: test2 body: - '{"index": {}}' - - '{"@timestamp": "2021-04-28T18:50:04.467Z", "dim": "A", "agg_metric": {"max": 10}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "dim": "A", "agg_metric": {"max": 10, "min": 5, "sum": 15, "value_count": 2}}' - '{"index": {}}' - - '{"@timestamp": "2021-04-28T18:50:24.467Z", "dim": "B", "agg_metric": {"max": 20}}' + - '{"@timestamp": "2021-04-28T18:50:24.467Z", "dim": "B", "agg_metric": {"max": 20, "min": -10, "sum": 5, "value_count": 3}}' --- load everything: @@ -213,7 +212,7 @@ from doc with aggregate_metric_double: - match: {columns.0.name: "@timestamp"} - match: {columns.0.type: "date"} - match: {columns.1.name: "agg_metric"} - - match: {columns.1.type: "unsupported"} + - match: {columns.1.type: "aggregate_metric_double"} - match: {columns.2.name: "dim"} - match: {columns.2.type: "keyword"} - match: {columns.3.name: "k8s.pod.ip"} @@ -225,7 +224,21 @@ from doc with aggregate_metric_double: --- stats on aggregate_metric_double: - do: - catch: /Cannot use field \[agg_metric\] with unsupported type \[aggregate_metric_double\]/ + allowed_warnings_regex: + - "No limit defined, adding default limit of \\[.*\\]" + esql.query: + body: + query: 'FROM test2 | STATS max(agg_metric)' + + - match: { columns.0.name: "max(agg_metric)" } + - match: { columns.0.type: "aggregate_metric_double" } + - match: { values.0.0: 20.0 } +# TODO: add other aggregations (min, sum, count) + +--- +grouping stats on aggregate_metric_double: + - do: + catch: /grouping aggregator is not supported yet/ esql.query: body: query: 'FROM test2 | STATS max(agg_metric) BY dim' @@ -242,7 +255,7 @@ from index pattern unsupported counter: - match: {columns.0.name: "@timestamp"} - match: {columns.0.type: "date"} - match: {columns.1.name: "agg_metric"} - - match: {columns.1.type: "unsupported"} + - match: {columns.1.type: "aggregate_metric_double"} - match: {columns.2.name: "dim"} - match: {columns.2.type: "keyword"} - match: {columns.3.name: "k8s.pod.ip"} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml index 049895bc9f31a..a03413af791ef 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_unsupported_types.yml @@ -13,7 +13,7 @@ setup: properties: aggregate_metric_double: type: aggregate_metric_double - metrics: [ min, max ] + metrics: [ min, max, sum, value_count ] default_metric: max binary: type: binary @@ -81,7 +81,7 @@ setup: body: - { "index": { } } - { - "aggregate_metric_double": { "min": 1.0, "max": 3.0 }, + "aggregate_metric_double": { "min": 1.0, "max": 3.0, "sum": 15.3, "value_count": 8 }, "binary": "U29tZSBiaW5hcnkgYmxvYg==", "completion": "foo bar", "date_nanos": "2015-01-01T12:10:30.123456789Z", @@ -119,7 +119,7 @@ unsupported: - method: POST path: /_query parameters: [ ] - capabilities: [ date_nanos_type ] + capabilities: [ aggregate_metric_double ] reason: "support for date nanos type" - do: @@ -131,7 +131,7 @@ unsupported: query: 'from test' - match: { columns.0.name: aggregate_metric_double } - - match: { columns.0.type: unsupported } + - match: { columns.0.type: aggregate_metric_double } - match: { columns.1.name: binary } - match: { columns.1.type: unsupported } - match: { columns.2.name: completion } @@ -190,7 +190,7 @@ unsupported: - match: { columns.28.type: integer } - length: { values: 1 } - - match: { values.0.0: null } + - match: { values.0.0: 3.0 } - match: { values.0.1: null } - match: { values.0.2: null } - match: { values.0.3: "2015-01-01T12:10:30.123456789Z" } @@ -227,7 +227,7 @@ unsupported: body: query: 'from test | limit 0' - match: { columns.0.name: aggregate_metric_double } - - match: { columns.0.type: unsupported } + - match: { columns.0.type: aggregate_metric_double } - match: { columns.1.name: binary } - match: { columns.1.type: unsupported } - match: { columns.2.name: completion } @@ -308,7 +308,7 @@ unsupported with sort: - method: POST path: /_query parameters: [ ] - capabilities: [ date_nanos_type ] + capabilities: [ aggregate_metric_double ] reason: "support for date nanos type" - do: @@ -320,7 +320,7 @@ unsupported with sort: query: 'from test | sort some_doc.bar' - match: { columns.0.name: aggregate_metric_double } - - match: { columns.0.type: unsupported } + - match: { columns.0.type: aggregate_metric_double } - match: { columns.1.name: binary } - match: { columns.1.type: unsupported } - match: { columns.2.name: completion } @@ -379,7 +379,7 @@ unsupported with sort: - match: { columns.28.type: integer } - length: { values: 1 } - - match: { values.0.0: null } + - match: { values.0.0: 3.0 } - match: { values.0.1: null } - match: { values.0.2: null } - match: { values.0.3: "2015-01-01T12:10:30.123456789Z" } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml new file mode 100644 index 0000000000000..0349ed32b699e --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml @@ -0,0 +1,145 @@ +setup: + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + index: + mode: time_series + routing_path: [ metricset, k8s.pod.uid ] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + metricset: + type: keyword + time_series_dimension: true + k8s: + properties: + pod: + properties: + uid: + type: keyword + time_series_dimension: true + name: + type: keyword + created_at: + type: date_nanos + running: + type: boolean + number_of_containers: + type: integer + ip: + type: ip + tags: + type: keyword + values: + type: integer + network: + properties: + tx: + type: long + time_series_metric: gauge + rx: + type: long + time_series_metric: gauge + - do: + bulk: + refresh: true + index: test + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001810, "rx": 802133}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2005177, "rx": 801479}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.41", "network": {"tx": 2006223, "rx": 802337}, "created_at": "2021-04-28T19:36:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 2]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:51:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.22", "network": {"tx": 2012916, "rx": 803685}, "created_at": "2021-04-28T19:37:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 1]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.33", "network": {"tx": 1434521, "rx": 530575}, "created_at": "2021-04-28T19:42:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test"], "values": [2, 3, 4]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:50:23.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.56", "network": {"tx": 1434577, "rx": 530600}, "created_at": "2021-04-28T19:43:00.000Z", "running": false, "number_of_containers": 1, "tags": ["backend", "test", "us-west2"], "values": [2, 1, 1]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:50:53.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.37", "network": {"tx": 1434587, "rx": 530604}, "created_at": "2021-04-28T19:44:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [4, 5, 2]}}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:51:03.142Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.120", "network": {"tx": 1434595, "rx": 530605}, "created_at": "2021-04-28T19:45:00.000Z", "running": true, "number_of_containers": 1, "tags": ["backend", "test", "us-west1"], "values": [3, 2, 1]}}}' + + - do: + indices.put_settings: + index: test + body: + index.blocks.write: true + +--- +"Query downsampled index": + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + esql.query: + body: + query: 'FROM test-downsample | sort k8s.pod.name, @timestamp | LIMIT 100' + columnar: true + + - length: { columns: 12 } + - match: { columns.0.name: "@timestamp" } + - match: { columns.0.type: "date" } + - match: { columns.1.name: "k8s.pod.created_at" } + - match: { columns.1.type: "date_nanos" } + - match: { columns.2.name: "k8s.pod.ip" } + - match: { columns.2.type: "ip" } + - match: { columns.3.name: "k8s.pod.name" } + - match: { columns.3.type: "keyword" } + - match: { columns.4.name: "k8s.pod.network.rx" } + - match: { columns.4.type: "aggregate_metric_double" } + - match: { columns.5.name: "k8s.pod.network.tx" } + - match: { columns.5.type: "aggregate_metric_double" } + - match: { columns.6.name: "k8s.pod.number_of_containers" } + - match: { columns.6.type: "integer" } + - match: { columns.7.name: "k8s.pod.running" } + - match: { columns.7.type: "boolean" } + - match: { columns.8.name: "k8s.pod.tags" } + - match: { columns.8.type: "keyword" } + - match: { columns.9.name: "k8s.pod.uid" } + - match: { columns.9.type: "keyword" } + - match: { columns.10.name: "k8s.pod.values" } + - match: { columns.10.type: "integer" } + - match: { columns.11.name: "metricset" } + - match: { columns.11.type: "keyword" } + - match: { values.0: [ "2021-04-28T18:00:00.000Z","2021-04-28T20:00:00.000Z", "2021-04-28T18:00:00.000Z", "2021-04-28T19:00:00.000Z" ] } + - match: { values.1: [ "2021-04-28T19:35:00.000Z", "2021-04-28T19:37:00.000Z", "2021-04-28T19:43:00.000Z", "2021-04-28T19:45:00.000Z" ] } + - match: { values.2: [ "10.10.55.26", "10.10.55.22", "10.10.55.56", "10.10.55.120" ] } + - match: { values.3: [ "cat", "cat", "dog", "dog" ] } + # TODO: reconsider what values to return for aggregate_metric_double type: + # (Currently returns what is configured as default metric, which in case of downsampled indices is the max metric sub field) + - match: { values.4: [ 802133.0, 803685.0, 530600.0, 530605.0 ] } + - match: { values.5: [ 2005177.0, 2012916.0, 1434577.0, 1434595.0 ] } + - match: { values.6: [ 2, 2, 1, 1 ] } + - match: { values.7: [ true, true, false, true ] } + - match: { values.8: [ [ "backend", "prod", "us-west1" ], [ "backend", "prod" ], [ "backend", "test", "us-west2" ], [ "backend", "test", "us-west1" ] ] } + - match: { values.9: [ "947e4ced-1786-4e53-9e0c-5c447e959507", "947e4ced-1786-4e53-9e0c-5c447e959507", "df3145b3-0563-4d3b-a0f7-897eb2876ea9", "df3145b3-0563-4d3b-a0f7-897eb2876ea9" ] } + - match: { values.10: [ [ 1, 1, 3 ], [ 1, 2, 3 ], [ 1, 1, 2 ],[ 1, 2, 3 ] ] } + - match: { values.11: [ "pod", "pod", "pod", "pod" ] } + + # TODO: Support other aggregate functions: + # (aggregate_metric_double now defaults to max in downsampled indices, and therefor max can work with minimal work) + - do: + esql.query: + body: + query: 'FROM test-downsample | STATS max(k8s.pod.network.tx), max(k8s.pod.network.rx) | LIMIT 1' + columnar: true + - length: { columns: 2 } + - match: { values.0: [ 2012916.0 ] } + - match: { values.1: [ 803685.0 ] }