diff --git a/docs/changelog/131658.yaml b/docs/changelog/131658.yaml new file mode 100644 index 0000000000000..da40694687fe7 --- /dev/null +++ b/docs/changelog/131658.yaml @@ -0,0 +1,5 @@ +pr: 131658 +summary: Fix `aggregate_metric_double` sorting and `mv_expand` issues +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java new file mode 100644 index 0000000000000..b2d5ee0b710d5 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleArrayBlock.java @@ -0,0 +1,292 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; + +public final class AggregateMetricDoubleArrayBlock extends AbstractNonThreadSafeRefCounted implements AggregateMetricDoubleBlock { + private final DoubleBlock minBlock; + private final DoubleBlock maxBlock; + private final DoubleBlock sumBlock; + private final IntBlock countBlock; + private final int positionCount; + + public AggregateMetricDoubleArrayBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) { + this.minBlock = minBlock; + this.maxBlock = maxBlock; + this.sumBlock = sumBlock; + this.countBlock = countBlock; + this.positionCount = minBlock.getPositionCount(); + for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + if (b.getPositionCount() != positionCount) { + assert false : "expected positionCount=" + positionCount + " but was " + b; + throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b); + } + if (b.isReleased()) { + assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"; + throw new IllegalArgumentException( + "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released" + ); + } + } + } + + public static AggregateMetricDoubleArrayBlock fromCompositeBlock(CompositeBlock block) { + assert block.getBlockCount() == 4 + : "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks"; + DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()); + DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()); + DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()); + IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()); + return new AggregateMetricDoubleArrayBlock(min, max, sum, count); + } + + public CompositeBlock asCompositeBlock() { + final Block[] blocks = new Block[4]; + blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = minBlock; + blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = maxBlock; + blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = sumBlock; + blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = countBlock; + return new CompositeBlock(blocks); + } + + @Override + protected void closeInternal() { + Releasables.close(minBlock, maxBlock, sumBlock, countBlock); + } + + @Override + public Vector asVector() { + return null; + } + + @Override + public int getTotalValueCount() { + int totalValueCount = 0; + for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + totalValueCount += b.getTotalValueCount(); + } + return totalValueCount; + } + + @Override + public int getPositionCount() { + return positionCount; + } + + @Override + public int getFirstValueIndex(int position) { + return minBlock.getFirstValueIndex(position); + } + + @Override + public int getValueCount(int position) { + int max = 0; + for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + max = Math.max(max, b.getValueCount(position)); + } + return max; + } + + @Override + public ElementType elementType() { + return ElementType.AGGREGATE_METRIC_DOUBLE; + } + + @Override + public BlockFactory blockFactory() { + return minBlock.blockFactory(); + } + + @Override + public void allowPassingToDifferentDriver() { + for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + block.allowPassingToDifferentDriver(); + } + } + + @Override + public boolean isNull(int position) { + for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + if (block.isNull(position) == false) { + return false; + } + } + return true; + } + + @Override + public boolean mayHaveNulls() { + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls); + } + + @Override + public boolean areAllValuesNull() { + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull); + } + + @Override + public boolean mayHaveMultivaluedFields() { + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields); + } + + @Override + public boolean doesHaveMultivaluedFields() { + if (Stream.of(minBlock, maxBlock, sumBlock, countBlock).noneMatch(Block::mayHaveMultivaluedFields)) { + return false; + } + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields); + } + + @Override + public AggregateMetricDoubleBlock filter(int... positions) { + AggregateMetricDoubleArrayBlock result = null; + DoubleBlock newMinBlock = null; + DoubleBlock newMaxBlock = null; + DoubleBlock newSumBlock = null; + IntBlock newCountBlock = null; + try { + newMinBlock = minBlock.filter(positions); + newMaxBlock = maxBlock.filter(positions); + newSumBlock = sumBlock.filter(positions); + newCountBlock = countBlock.filter(positions); + result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + return result; + } finally { + if (result == null) { + Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + } + } + } + + @Override + public AggregateMetricDoubleBlock keepMask(BooleanVector mask) { + AggregateMetricDoubleArrayBlock result = null; + DoubleBlock newMinBlock = null; + DoubleBlock newMaxBlock = null; + DoubleBlock newSumBlock = null; + IntBlock newCountBlock = null; + try { + newMinBlock = minBlock.keepMask(mask); + newMaxBlock = maxBlock.keepMask(mask); + newSumBlock = sumBlock.keepMask(mask); + newCountBlock = countBlock.keepMask(mask); + result = new AggregateMetricDoubleArrayBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + return result; + } finally { + if (result == null) { + Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + } + } + } + + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO: support + throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock"); + } + + @Override + public MvOrdering mvOrdering() { + // TODO: determine based on sub-blocks + return MvOrdering.UNORDERED; + } + + @Override + public AggregateMetricDoubleBlock expand() { + this.incRef(); + return this; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + block.writeTo(out); + } + } + + public static Block readFrom(StreamInput in) throws IOException { + boolean success = false; + DoubleBlock minBlock = null; + DoubleBlock maxBlock = null; + DoubleBlock sumBlock = null; + IntBlock countBlock = null; + BlockStreamInput blockStreamInput = (BlockStreamInput) in; + try { + minBlock = DoubleBlock.readFrom(blockStreamInput); + maxBlock = DoubleBlock.readFrom(blockStreamInput); + sumBlock = DoubleBlock.readFrom(blockStreamInput); + countBlock = IntBlock.readFrom(blockStreamInput); + AggregateMetricDoubleArrayBlock result = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock); + success = true; + return result; + } finally { + if (success == false) { + Releasables.close(minBlock, maxBlock, sumBlock, countBlock); + } + } + } + + @Override + public long ramBytesUsed() { + return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AggregateMetricDoubleBlock that) { + return AggregateMetricDoubleBlock.equals(this, that); + } + return false; + } + + @Override + public int hashCode() { + return AggregateMetricDoubleBlock.hash(this); + } + + public DoubleBlock minBlock() { + return minBlock; + } + + public DoubleBlock maxBlock() { + return maxBlock; + } + + public DoubleBlock sumBlock() { + return sumBlock; + } + + public IntBlock countBlock() { + return countBlock; + } + + public Block getMetricBlock(int index) { + if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) { + return minBlock; + } + if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) { + return maxBlock; + } + if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) { + return sumBlock; + } + if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) { + return countBlock; + } + throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock."); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java index 600b149b4a6c4..9a2736b16e15c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java @@ -7,297 +7,102 @@ package org.elasticsearch.compute.data; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.ReleasableIterator; -import org.elasticsearch.core.Releasables; -import java.io.IOException; import java.util.List; -import java.util.Objects; -import java.util.stream.Stream; -public final class AggregateMetricDoubleBlock extends AbstractNonThreadSafeRefCounted implements Block { - private final DoubleBlock minBlock; - private final DoubleBlock maxBlock; - private final DoubleBlock sumBlock; - private final IntBlock countBlock; - private final int positionCount; - - public AggregateMetricDoubleBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) { - this.minBlock = minBlock; - this.maxBlock = maxBlock; - this.sumBlock = sumBlock; - this.countBlock = countBlock; - this.positionCount = minBlock.getPositionCount(); - for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - if (b.getPositionCount() != positionCount) { - assert false : "expected positionCount=" + positionCount + " but was " + b; - throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b); - } - if (b.isReleased()) { - assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"; - throw new IllegalArgumentException( - "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released" - ); - } - } - } - - public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block) { - assert block.getBlockCount() == 4 - : "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks"; - DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()); - DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()); - DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()); - IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()); - return new AggregateMetricDoubleBlock(min, max, sum, count); - } - - public CompositeBlock asCompositeBlock() { - final Block[] blocks = new Block[4]; - blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = minBlock; - blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = maxBlock; - blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = sumBlock; - blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = countBlock; - return new CompositeBlock(blocks); - } - - @Override - protected void closeInternal() { - Releasables.close(minBlock, maxBlock, sumBlock, countBlock); - } +/** + * Block that stores aggregate_metric_double values. + */ +public sealed interface AggregateMetricDoubleBlock extends Block permits AggregateMetricDoubleArrayBlock, ConstantNullBlock { @Override - public Vector asVector() { - return null; - } + AggregateMetricDoubleBlock filter(int... positions); @Override - public int getTotalValueCount() { - int totalValueCount = 0; - for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - totalValueCount += b.getTotalValueCount(); - } - return totalValueCount; - } + AggregateMetricDoubleBlock keepMask(BooleanVector mask); @Override - public int getPositionCount() { - return positionCount; - } + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); @Override - public int getFirstValueIndex(int position) { - return minBlock.getFirstValueIndex(position); - } + AggregateMetricDoubleBlock expand(); + /** + * Compares the given object with this block for equality. Returns {@code true} if and only if the + * given object is a AggregateMetricDoubleBlock, and both blocks are + * {@link #equals(AggregateMetricDoubleBlock, AggregateMetricDoubleBlock) equal}. + */ @Override - public int getValueCount(int position) { - int max = 0; - for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - max = Math.max(max, b.getValueCount(position)); - } - return max; - } + boolean equals(Object obj); + /** Returns the hash code of this block, as defined by {@link #hash(AggregateMetricDoubleBlock)}. */ @Override - public ElementType elementType() { - return ElementType.AGGREGATE_METRIC_DOUBLE; - } + int hashCode(); - @Override - public BlockFactory blockFactory() { - return minBlock.blockFactory(); - } - - @Override - public void allowPassingToDifferentDriver() { - for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - block.allowPassingToDifferentDriver(); + /** + * Returns {@code true} if the given blocks are equal to each other, otherwise {@code false}. + * Two blocks are considered equal if they have the same position count, and contain the same + * values (including absent null values) in the same order. This definition ensures that the + * equals method works properly across different implementations of the AggregateMetricDoubleBlock interface. + */ + static boolean equals(AggregateMetricDoubleBlock block1, AggregateMetricDoubleBlock block2) { + if (block1 == block2) { + return true; } - } - - @Override - public boolean isNull(int position) { - for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - if (block.isNull(position) == false) { - return false; - } - } - return true; - } - - @Override - public boolean mayHaveNulls() { - return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls); - } - - @Override - public boolean areAllValuesNull() { - return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull); - } - - @Override - public boolean mayHaveMultivaluedFields() { - return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields); - } - - @Override - public boolean doesHaveMultivaluedFields() { - if (Stream.of(minBlock, maxBlock, sumBlock, countBlock).noneMatch(Block::mayHaveMultivaluedFields)) { + final int positions = block1.getPositionCount(); + if (positions != block2.getPositionCount()) { return false; } - return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields); - } - - @Override - public Block filter(int... positions) { - AggregateMetricDoubleBlock result = null; - DoubleBlock newMinBlock = null; - DoubleBlock newMaxBlock = null; - DoubleBlock newSumBlock = null; - IntBlock newCountBlock = null; - try { - newMinBlock = minBlock.filter(positions); - newMaxBlock = maxBlock.filter(positions); - newSumBlock = sumBlock.filter(positions); - newCountBlock = countBlock.filter(positions); - result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); - return result; - } finally { - if (result == null) { - Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); - } - } - } - - @Override - public Block keepMask(BooleanVector mask) { - AggregateMetricDoubleBlock result = null; - DoubleBlock newMinBlock = null; - DoubleBlock newMaxBlock = null; - DoubleBlock newSumBlock = null; - IntBlock newCountBlock = null; - try { - newMinBlock = minBlock.keepMask(mask); - newMaxBlock = maxBlock.keepMask(mask); - newSumBlock = sumBlock.keepMask(mask); - newCountBlock = countBlock.keepMask(mask); - result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); - return result; - } finally { - if (result == null) { - Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + for (var doubleMetric : List.of( + AggregateMetricDoubleBlockBuilder.Metric.MIN, + AggregateMetricDoubleBlockBuilder.Metric.MAX, + AggregateMetricDoubleBlockBuilder.Metric.SUM + )) { + DoubleBlock doubleBlock1 = (DoubleBlock) block1.getMetricBlock(doubleMetric.getIndex()); + DoubleBlock doubleBlock2 = (DoubleBlock) block2.getMetricBlock(doubleMetric.getIndex()); + if (DoubleBlock.equals(doubleBlock1, doubleBlock2) == false) { + return false; } } - } - - @Override - public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { - // TODO: support - throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock"); - } - - @Override - public MvOrdering mvOrdering() { - // TODO: determine based on sub-blocks - return MvOrdering.UNORDERED; - } - - @Override - public Block expand() { - // TODO: support - throw new UnsupportedOperationException("AggregateMetricDoubleBlock"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { - block.writeTo(out); - } - } - - public static Block readFrom(StreamInput in) throws IOException { - boolean success = false; - DoubleBlock minBlock = null; - DoubleBlock maxBlock = null; - DoubleBlock sumBlock = null; - IntBlock countBlock = null; - BlockStreamInput blockStreamInput = (BlockStreamInput) in; - try { - minBlock = DoubleBlock.readFrom(blockStreamInput); - maxBlock = DoubleBlock.readFrom(blockStreamInput); - sumBlock = DoubleBlock.readFrom(blockStreamInput); - countBlock = IntBlock.readFrom(blockStreamInput); - AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock); - success = true; - return result; - } finally { - if (success == false) { - Releasables.close(minBlock, maxBlock, sumBlock, countBlock); + IntBlock intBlock1 = block1.countBlock(); + IntBlock intBlock2 = block2.countBlock(); + return IntBlock.equals(intBlock1, intBlock2); + } + + static int hash(AggregateMetricDoubleBlock block) { + final int positions = block.getPositionCount(); + int result = 1; + for (int pos = 0; pos < positions; pos++) { + if (block.isNull(pos)) { + result = 31 * result - 1; + } else { + final int valueCount = block.getValueCount(pos); + result = 31 * result + valueCount; + final int firstValueIdx = block.getFirstValueIndex(pos); + for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + for (DoubleBlock b : List.of(block.minBlock(), block.maxBlock(), block.sumBlock())) { + result *= 31; + result += b.isNull(firstValueIdx + valueIndex) ? -1 : Double.hashCode(b.getDouble(firstValueIdx + valueIndex)); + } + result *= 31; + result += block.countBlock().isNull(firstValueIdx + valueIndex) + ? -1 + : block.countBlock().getInt(firstValueIdx + valueIndex); + } } } + return result; } - @Override - public long ramBytesUsed() { - return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed(); - } + DoubleBlock minBlock(); - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AggregateMetricDoubleBlock that = (AggregateMetricDoubleBlock) o; - return positionCount == that.positionCount - && minBlock.equals(that.minBlock) - && maxBlock.equals(that.maxBlock) - && sumBlock.equals(that.sumBlock) - && countBlock.equals(that.countBlock); - } + DoubleBlock maxBlock(); - @Override - public int hashCode() { - return Objects.hash( - DoubleBlock.hash(minBlock), - DoubleBlock.hash(maxBlock), - DoubleBlock.hash(sumBlock), - IntBlock.hash(countBlock), - positionCount - ); - } + DoubleBlock sumBlock(); - public DoubleBlock minBlock() { - return minBlock; - } + IntBlock countBlock(); - public DoubleBlock maxBlock() { - return maxBlock; - } - - public DoubleBlock sumBlock() { - return sumBlock; - } - - public IntBlock countBlock() { - return countBlock; - } - - public Block getMetricBlock(int index) { - if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) { - return minBlock; - } - if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) { - return maxBlock; - } - if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) { - return sumBlock; - } - if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) { - return countBlock; - } - throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock."); - } + Block getMetricBlock(int index); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java index 90e57d87fcd0c..3d1564d93af56 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java @@ -59,7 +59,7 @@ protected int elementSize() { } @Override - public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) { + public AggregateMetricDoubleBlockBuilder copyFrom(Block b, int beginInclusive, int endExclusive) { Block minBlock; Block maxBlock; Block sumBlock; @@ -84,7 +84,7 @@ public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) { } @Override - public AbstractBlockBuilder appendNull() { + public AggregateMetricDoubleBlockBuilder appendNull() { minBuilder.appendNull(); maxBuilder.appendNull(); sumBuilder.appendNull(); @@ -93,7 +93,7 @@ public AbstractBlockBuilder appendNull() { } @Override - public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { + public AggregateMetricDoubleBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { minBuilder.mvOrdering(mvOrdering); maxBuilder.mvOrdering(mvOrdering); sumBuilder.mvOrdering(mvOrdering); @@ -102,7 +102,7 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { } @Override - public Block build() { + public AggregateMetricDoubleBlock build() { DoubleBlock minBlock = null; DoubleBlock maxBlock = null; DoubleBlock sumBlock = null; @@ -114,7 +114,7 @@ public Block build() { maxBlock = maxBuilder.build(); sumBlock = sumBuilder.build(); countBlock = countBuilder.build(); - AggregateMetricDoubleBlock block = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock); + AggregateMetricDoubleBlock block = new AggregateMetricDoubleArrayBlock(minBlock, maxBlock, sumBlock, countBlock); success = true; return block; } finally { @@ -174,9 +174,9 @@ public String getLabel() { public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable { public AggregateMetricDoubleLiteral { - min = min.isNaN() ? null : min; - max = max.isNaN() ? null : max; - sum = sum.isNaN() ? null : sum; + min = (min == null || min.isNaN()) ? null : min; + max = (max == null || max.isNaN()) ? null : max; + sum = (sum == null || sum.isNaN()) ? null : sum; } public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java index bc8b31bc565df..c05d54d4e3c96 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java @@ -345,7 +345,7 @@ static Block[] buildAll(Block.Builder... builders) { * This should be paired with {@link #readTypedBlock(BlockStreamInput)} */ static void writeTypedBlock(Block block, StreamOutput out) throws IOException { - if (false == supportsAggregateMetricDoubleBlock(out.getTransportVersion()) && block instanceof AggregateMetricDoubleBlock a) { + if (false == supportsAggregateMetricDoubleBlock(out.getTransportVersion()) && block instanceof AggregateMetricDoubleArrayBlock a) { block = a.asCompositeBlock(); } block.elementType().writeTo(out); @@ -360,7 +360,7 @@ static Block readTypedBlock(BlockStreamInput in) throws IOException { ElementType elementType = ElementType.readFrom(in); Block block = elementType.reader.readBlock(in); if (false == supportsAggregateMetricDoubleBlock(in.getTransportVersion()) && block instanceof CompositeBlock compositeBlock) { - block = AggregateMetricDoubleBlock.fromCompositeBlock(compositeBlock); + block = AggregateMetricDoubleArrayBlock.fromCompositeBlock(compositeBlock); } return block; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index 55053f509591d..9e730004ab9f9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -436,7 +436,7 @@ public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(in return new AggregateMetricDoubleBlockBuilder(estimatedSize, this); } - public final Block newConstantAggregateMetricDoubleBlock( + public final AggregateMetricDoubleBlock newConstantAggregateMetricDoubleBlock( AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value, int positions ) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 967f91a3ee471..16242d6fb8050 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -25,7 +25,8 @@ public final class ConstantNullBlock extends AbstractNonThreadSafeRefCounted LongBlock, FloatBlock, DoubleBlock, - BytesRefBlock { + BytesRefBlock, + AggregateMetricDoubleBlock { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ConstantNullBlock.class); private final int positionCount; @@ -143,6 +144,31 @@ public int hashCode() { return result; } + @Override + public DoubleBlock minBlock() { + return this; + } + + @Override + public DoubleBlock maxBlock() { + return this; + } + + @Override + public DoubleBlock sumBlock() { + return this; + } + + @Override + public IntBlock countBlock() { + return this; + } + + @Override + public Block getMetricBlock(int index) { + return this; + } + @Override public String toString() { return "ConstantNullBlock[positions=" + getPositionCount() + "]"; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java index 64fc0d2bdd263..e651a65cf69b2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java @@ -63,7 +63,7 @@ public enum ElementType { 10, "AggregateMetricDouble", BlockFactory::newAggregateMetricDoubleBlockBuilder, - AggregateMetricDoubleBlock::readFrom + AggregateMetricDoubleArrayBlock::readFrom ); private interface BuilderSupplier { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java index a814cf5f98e0a..f4c091e22fac6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java @@ -30,6 +30,11 @@ public void decodeKey(BytesRef keys) { @Override public void decodeValue(BytesRef values) { + int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values); + if (count == 0) { + builder.appendNull(); + return; + } for (BlockLoader.DoubleBuilder subBuilder : List.of(builder.min(), builder.max(), builder.sum())) { if (TopNEncoder.DEFAULT_UNSORTABLE.decodeBoolean(values)) { subBuilder.appendDouble(TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(values)); @@ -51,7 +56,7 @@ public Block build() { @Override public String toString() { - return "ValueExtractorForAggregateMetricDouble"; + return "ResultBuilderForAggregateMetricDouble"; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java index 9bac1b9ba5eee..0e0694b328d9d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java @@ -24,6 +24,7 @@ public class ValueExtractorForAggregateMetricDouble implements ValueExtractor { @Override public void writeValue(BreakingBytesRefBuilder values, int position) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values); for (DoubleBlock doubleBlock : List.of(block.minBlock(), block.maxBlock(), block.sumBlock())) { if (doubleBlock.isNull(position)) { TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockEqualityTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockEqualityTests.java new file mode 100644 index 0000000000000..1a31ca07e546c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockEqualityTests.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.compute.test.ComputeTestCase; +import org.elasticsearch.compute.test.TestBlockFactory; +import org.elasticsearch.core.Releasables; + +import java.util.List; + +public class AggregateMetricDoubleBlockEqualityTests extends ComputeTestCase { + + static final BlockFactory blockFactory = TestBlockFactory.getNonBreakingInstance(); + + // TODO: Add additional tests + + public void testEmptyBlock() { + // all these "empty" blocks should be equivalent + var partialMetricBuilder = blockFactory.newAggregateMetricDoubleBlockBuilder(0); + for (var subBuilder : List.of( + partialMetricBuilder.min(), + partialMetricBuilder.max(), + partialMetricBuilder.sum(), + partialMetricBuilder.count() + )) { + if (randomBoolean()) { + subBuilder.appendNull(); + } else { + if (subBuilder instanceof DoubleBlockBuilder doubleBlockBuilder) { + doubleBlockBuilder.appendDouble(0.0); + } else if (subBuilder instanceof IntBlockBuilder intBlockBuilder) { + intBlockBuilder.appendInt(0); + } + } + } + + List blocks = List.of( + blockFactory.newAggregateMetricDoubleBlockBuilder(0).build(), + blockFactory.newAggregateMetricDoubleBlockBuilder(0).appendNull().build().filter(), + partialMetricBuilder.build().filter(), + blockFactory.newConstantAggregateMetricDoubleBlock( + new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(0.0, 0.0, 0.0, 0), + 0 + ).filter(), + (ConstantNullBlock) blockFactory.newConstantNullBlock(0) + ); + + assertAllEquals(blocks); + Releasables.close(blocks); + } + + public void testSimpleBlockWithManyNulls() { + int positions = randomIntBetween(1, 256); + boolean grow = randomBoolean(); + AggregateMetricDoubleBlockBuilder builder1 = blockFactory.newAggregateMetricDoubleBlockBuilder(grow ? 0 : positions); + AggregateMetricDoubleBlockBuilder builder2 = blockFactory.newAggregateMetricDoubleBlockBuilder(grow ? 0 : positions); + ConstantNullBlock.Builder builder3 = new ConstantNullBlock.Builder(blockFactory); + for (int p = 0; p < positions; p++) { + builder1.appendNull(); + builder2.appendNull(); + builder3.appendNull(); + } + AggregateMetricDoubleBlock block1 = builder1.build(); + AggregateMetricDoubleBlock block2 = builder2.build(); + Block block3 = builder3.build(); + assertEquals(positions, block1.getPositionCount()); + assertTrue(block1.mayHaveNulls()); + assertTrue(block1.isNull(0)); + + List blocks = List.of(block1, block2, block3); + assertAllEquals(blocks); + } + + static void assertAllEquals(List objs) { + for (Object obj1 : objs) { + for (Object obj2 : objs) { + assertEquals(obj1, obj2); + assertEquals(obj2, obj1); + // equal objects must generate the same hash code + assertEquals(obj1.hashCode(), obj2.hashCode()); + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java index b345d8c0b196a..c5f91f05e76b3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; @@ -42,9 +43,27 @@ public static Iterable parameters() { switch (e) { case UNKNOWN -> { } - case COMPOSITE, AGGREGATE_METRIC_DOUBLE -> { + case COMPOSITE -> { // TODO: add later } + case AGGREGATE_METRIC_DOUBLE -> { + cases.add( + valueTestCase( + "regular aggregate_metric_double", + e, + TopNEncoder.DEFAULT_UNSORTABLE, + () -> randomAggregateMetricDouble(true) + ) + ); + cases.add( + valueTestCase( + "aggregate_metric_double with nulls", + e, + TopNEncoder.DEFAULT_UNSORTABLE, + () -> randomAggregateMetricDouble(false) + ) + ); + } case FLOAT -> { } case BYTES_REF -> { @@ -105,7 +124,8 @@ public static Iterable parameters() { ).asBlock() ) } ); - case NULL -> cases.add(valueTestCase("null", e, TopNEncoder.DEFAULT_UNSORTABLE, () -> null)); + case NULL -> { + } default -> { cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e))); cases.add( @@ -118,6 +138,9 @@ public static Iterable parameters() { ); } } + if (e != ElementType.UNKNOWN && e != ElementType.COMPOSITE && e != ElementType.FLOAT && e != ElementType.DOC) { + cases.add(valueTestCase("null " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> null)); + } } return cases; } @@ -221,4 +244,16 @@ public void testInKey() { assertThat(result.build(), equalTo(value)); } + + public static AggregateMetricDoubleLiteral randomAggregateMetricDouble(boolean allMetrics) { + if (allMetrics) { + return new AggregateMetricDoubleLiteral(randomDouble(), randomDouble(), randomDouble(), randomInt()); + } + return new AggregateMetricDoubleLiteral( + randomBoolean() ? randomDouble() : null, + randomBoolean() ? randomDouble() : null, + randomBoolean() ? randomDouble() : null, + randomBoolean() ? randomInt() : null + ); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 9d553b178afeb..ba9ebe1c352d6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -890,6 +890,42 @@ public enum Cap { */ AGGREGATE_METRIC_DOUBLE_PARTIAL_SUBMETRICS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + /** + * Support for rendering aggregate_metric_double type + */ + AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support for to_aggregate_metric_double function + */ + AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support for sorting when aggregate_metric_doubles are present + */ + AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support avg with aggregate metric doubles + */ + AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Support for implicit casting of aggregate metric double when run in aggregations + */ + AGGREGATE_METRIC_DOUBLE_IMPLICIT_CASTING_IN_AGGS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Fixes bug when aggregate metric double is encoded as a single nul value but decoded as + * AggregateMetricDoubleBlock (expecting 4 values) in TopN. + */ + AGGREGATE_METRIC_DOUBLE_SORTING_FIXED(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + + /** + * Stop erroring out when trying to apply MV_EXPAND on aggregate metric double. + */ + AGGREGATE_METRIC_DOUBLE_MV_EXPAND(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), + /** * Support change point detection "CHANGE_POINT". */ @@ -913,11 +949,6 @@ public enum Cap { */ SUPPORT_PARTIAL_RESULTS, - /** - * Support for rendering aggregate_metric_double type - */ - AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * Support for RERANK command */ @@ -969,11 +1000,6 @@ public enum Cap { */ NON_FULL_TEXT_FUNCTIONS_SCORING, - /** - * Support for to_aggregate_metric_double function - */ - AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * The {@code _query} API now reports the original types. */ @@ -1000,11 +1026,6 @@ public enum Cap { */ MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT, - /** - * Support for sorting when aggregate_metric_doubles are present - */ - AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), - /** * Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}. */ @@ -1241,10 +1262,6 @@ public enum Cap { * Support improved behavior for LIKE operator when used with index fields. */ LIKE_ON_INDEX_FIELDS, - /** - * Support avg with aggregate metric doubles - */ - AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG), /** * Forbid usage of brackets in unquoted index and enrich policy names diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index 87a4492f33094..03564429ab827 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.data.AggregateMetricDoubleArrayBlock; import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; @@ -147,7 +148,12 @@ private Block build() { try { doubleBlock = valuesBuilder.build().asBlock(); countBlock = blockFactory.newConstantIntBlockWith(1, doubleBlock.getPositionCount()); - AggregateMetricDoubleBlock aggBlock = new AggregateMetricDoubleBlock(doubleBlock, doubleBlock, doubleBlock, countBlock); + AggregateMetricDoubleBlock aggBlock = new AggregateMetricDoubleArrayBlock( + doubleBlock, + doubleBlock, + doubleBlock, + countBlock + ); doubleBlock.incRef(); doubleBlock.incRef(); success = true; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml index ee1a381c6e589..726921ffd09f7 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml @@ -251,3 +251,111 @@ setup: - match: {values.0.1: 800479.0} - match: {values.0.2: 4812452.0} - match: {values.0.3: 6} + +--- +"Sort from multiple indices one with aggregate metric double": + - requires: + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [aggregate_metric_double_sorting_fixed] + reason: "Fix sorting for rows comprised of docs from multiple indices where agg metric is missing from some" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + indices.create: + index: test-2 + body: + mappings: + properties: + some_field: + type: keyword + + - do: + bulk: + refresh: true + index: test-2 + body: + - '{"index": {}}' + - '{"some_field": "im a keyword!!!!!"}' + + - do: + esql.query: + body: + query: "FROM test-* | SORT some_field, @timestamp, k8s.pod.uid | KEEP k8s.pod.network.rx, some_field, @timestamp | LIMIT 10" + + - length: {values: 5} + - length: {values.0: 3} + - match: {columns.0.name: "k8s.pod.network.rx"} + - match: {columns.0.type: "aggregate_metric_double"} + - match: {columns.1.name: "some_field"} + - match: {columns.1.type: "keyword"} + - match: {columns.2.name: "@timestamp"} + - match: {columns.2.type: "date"} + - match: {values.0.0: null} + - match: {values.0.1: "im a keyword!!!!!"} + - match: {values.0.2: null} + - match: {values.1.0: '{"min":801479.0,"max":802133.0,"sum":1603612.0,"value_count":2}'} + - match: {values.1.1: null} + - match: {values.1.2: "2021-04-28T18:00:00.000Z"} + - match: {values.2.0: '{"min":530575.0,"max":530600.0,"sum":1061175.0,"value_count":2}'} + - match: {values.2.1: null} + - match: {values.2.2: "2021-04-28T18:00:00.000Z"} + - match: {values.3.0: '{"min":530604.0,"max":530605.0,"sum":1061209.0,"value_count":2}'} + - match: {values.3.1: null} + - match: {values.3.2: "2021-04-28T19:00:00.000Z"} + - match: {values.4.0: '{"min":802337.0,"max":803685.0,"sum":1606022.0,"value_count":2}'} + - match: {values.4.1: null} + - match: {values.4.2: "2021-04-28T20:00:00.000Z"} + +--- +"MV_EXPAND on non-MV aggregate metric double": + - requires: + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [aggregate_metric_double_mv_expand] + reason: "Have MV_EXPAND not error out when applied to aggregate_metric_doubles (is a no-op)" + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + esql.query: + body: + query: "FROM test-downsample | MV_EXPAND k8s.pod.network.rx | SORT @timestamp, k8s.pod.uid | KEEP k8s.pod.network.rx, @timestamp | LIMIT 10" + + - length: {values: 4} + - length: {values.0: 2} + - match: {columns.0.name: "k8s.pod.network.rx"} + - match: {columns.0.type: "aggregate_metric_double"} + - match: {columns.1.name: "@timestamp"} + - match: {columns.1.type: "date"} + - match: {values.0.0: '{"min":801479.0,"max":802133.0,"sum":1603612.0,"value_count":2}'} + - match: {values.0.1: "2021-04-28T18:00:00.000Z"} + - match: {values.1.0: '{"min":530575.0,"max":530600.0,"sum":1061175.0,"value_count":2}'} + - match: {values.1.1: "2021-04-28T18:00:00.000Z"} + - match: {values.2.0: '{"min":530604.0,"max":530605.0,"sum":1061209.0,"value_count":2}'} + - match: {values.2.1: "2021-04-28T19:00:00.000Z"} + - match: {values.3.0: '{"min":802337.0,"max":803685.0,"sum":1606022.0,"value_count":2}'} + - match: {values.3.1: "2021-04-28T20:00:00.000Z"}