diff --git a/docs/changelog/127299.yaml b/docs/changelog/127299.yaml new file mode 100644 index 0000000000000..d83551fbbe1aa --- /dev/null +++ b/docs/changelog/127299.yaml @@ -0,0 +1,5 @@ +pr: 127299 +summary: Introduce `AggregateMetricDoubleBlock` +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 261b13bac1e27..e363e4e140574 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -230,6 +230,7 @@ static TransportVersion def(int id) { public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_0_00); public static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE = def(9_065_0_00); public static final TransportVersion PROJECT_METADATA_SETTINGS = def(9_066_00_0); + public static final TransportVersion AGGREGATE_METRIC_DOUBLE_BLOCK = def(9_067_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java new file mode 100644 index 0000000000000..600b149b4a6c4 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlock.java @@ -0,0 +1,303 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public final class AggregateMetricDoubleBlock extends AbstractNonThreadSafeRefCounted implements Block { + private final DoubleBlock minBlock; + private final DoubleBlock maxBlock; + private final DoubleBlock sumBlock; + private final IntBlock countBlock; + private final int positionCount; + + public AggregateMetricDoubleBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) { + this.minBlock = minBlock; + this.maxBlock = maxBlock; + this.sumBlock = sumBlock; + this.countBlock = countBlock; + this.positionCount = minBlock.getPositionCount(); + for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + if (b.getPositionCount() != positionCount) { + assert false : "expected positionCount=" + positionCount + " but was " + b; + throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b); + } + if (b.isReleased()) { + assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"; + throw new IllegalArgumentException( + "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released" + ); + } + } + } + + public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block) { + assert block.getBlockCount() == 4 + : "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks"; + DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()); + DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()); + DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()); + IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()); + return new AggregateMetricDoubleBlock(min, max, sum, count); + } + + public CompositeBlock asCompositeBlock() { + final Block[] blocks = new Block[4]; + blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = minBlock; + blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = maxBlock; + blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = sumBlock; + blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = countBlock; + return new CompositeBlock(blocks); + } + + @Override + protected void closeInternal() { + Releasables.close(minBlock, maxBlock, sumBlock, countBlock); + } + + @Override + public Vector asVector() { + return null; + } + + @Override + public int getTotalValueCount() { + int totalValueCount = 0; + for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + totalValueCount += b.getTotalValueCount(); + } + return totalValueCount; + } + + @Override + public int getPositionCount() { + return positionCount; + } + + @Override + public int getFirstValueIndex(int position) { + return minBlock.getFirstValueIndex(position); + } + + @Override + public int getValueCount(int position) { + int max = 0; + for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + max = Math.max(max, b.getValueCount(position)); + } + return max; + } + + @Override + public ElementType elementType() { + return ElementType.AGGREGATE_METRIC_DOUBLE; + } + + @Override + public BlockFactory blockFactory() { + return minBlock.blockFactory(); + } + + @Override + public void allowPassingToDifferentDriver() { + for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + block.allowPassingToDifferentDriver(); + } + } + + @Override + public boolean isNull(int position) { + for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + if (block.isNull(position) == false) { + return false; + } + } + return true; + } + + @Override + public boolean mayHaveNulls() { + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls); + } + + @Override + public boolean areAllValuesNull() { + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull); + } + + @Override + public boolean mayHaveMultivaluedFields() { + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields); + } + + @Override + public boolean doesHaveMultivaluedFields() { + if (Stream.of(minBlock, maxBlock, sumBlock, countBlock).noneMatch(Block::mayHaveMultivaluedFields)) { + return false; + } + return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields); + } + + @Override + public Block filter(int... positions) { + AggregateMetricDoubleBlock result = null; + DoubleBlock newMinBlock = null; + DoubleBlock newMaxBlock = null; + DoubleBlock newSumBlock = null; + IntBlock newCountBlock = null; + try { + newMinBlock = minBlock.filter(positions); + newMaxBlock = maxBlock.filter(positions); + newSumBlock = sumBlock.filter(positions); + newCountBlock = countBlock.filter(positions); + result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + return result; + } finally { + if (result == null) { + Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + } + } + } + + @Override + public Block keepMask(BooleanVector mask) { + AggregateMetricDoubleBlock result = null; + DoubleBlock newMinBlock = null; + DoubleBlock newMaxBlock = null; + DoubleBlock newSumBlock = null; + IntBlock newCountBlock = null; + try { + newMinBlock = minBlock.keepMask(mask); + newMaxBlock = maxBlock.keepMask(mask); + newSumBlock = sumBlock.keepMask(mask); + newCountBlock = countBlock.keepMask(mask); + result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + return result; + } finally { + if (result == null) { + Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock); + } + } + } + + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO: support + throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock"); + } + + @Override + public MvOrdering mvOrdering() { + // TODO: determine based on sub-blocks + return MvOrdering.UNORDERED; + } + + @Override + public Block expand() { + // TODO: support + throw new UnsupportedOperationException("AggregateMetricDoubleBlock"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) { + block.writeTo(out); + } + } + + public static Block readFrom(StreamInput in) throws IOException { + boolean success = false; + DoubleBlock minBlock = null; + DoubleBlock maxBlock = null; + DoubleBlock sumBlock = null; + IntBlock countBlock = null; + BlockStreamInput blockStreamInput = (BlockStreamInput) in; + try { + minBlock = DoubleBlock.readFrom(blockStreamInput); + maxBlock = DoubleBlock.readFrom(blockStreamInput); + sumBlock = DoubleBlock.readFrom(blockStreamInput); + countBlock = IntBlock.readFrom(blockStreamInput); + AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock); + success = true; + return result; + } finally { + if (success == false) { + Releasables.close(minBlock, maxBlock, sumBlock, countBlock); + } + } + } + + @Override + public long ramBytesUsed() { + return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AggregateMetricDoubleBlock that = (AggregateMetricDoubleBlock) o; + return positionCount == that.positionCount + && minBlock.equals(that.minBlock) + && maxBlock.equals(that.maxBlock) + && sumBlock.equals(that.sumBlock) + && countBlock.equals(that.countBlock); + } + + @Override + public int hashCode() { + return Objects.hash( + DoubleBlock.hash(minBlock), + DoubleBlock.hash(maxBlock), + DoubleBlock.hash(sumBlock), + IntBlock.hash(countBlock), + positionCount + ); + } + + public DoubleBlock minBlock() { + return minBlock; + } + + public DoubleBlock maxBlock() { + return maxBlock; + } + + public DoubleBlock sumBlock() { + return sumBlock; + } + + public IntBlock countBlock() { + return countBlock; + } + + public Block getMetricBlock(int index) { + if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) { + return minBlock; + } + if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) { + return maxBlock; + } + if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) { + return sumBlock; + } + if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) { + return countBlock; + } + throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock."); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java index 32eb8c97fb28e..90e57d87fcd0c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java @@ -59,22 +59,22 @@ protected int elementSize() { } @Override - public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) { + public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) { Block minBlock; Block maxBlock; Block sumBlock; Block countBlock; - if (block.areAllValuesNull()) { - minBlock = block; - maxBlock = block; - sumBlock = block; - countBlock = block; + if (b.areAllValuesNull()) { + minBlock = b; + maxBlock = b; + sumBlock = b; + countBlock = b; } else { - CompositeBlock composite = (CompositeBlock) block; - minBlock = composite.getBlock(Metric.MIN.getIndex()); - maxBlock = composite.getBlock(Metric.MAX.getIndex()); - sumBlock = composite.getBlock(Metric.SUM.getIndex()); - countBlock = composite.getBlock(Metric.COUNT.getIndex()); + AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b; + minBlock = block.minBlock(); + maxBlock = block.maxBlock(); + sumBlock = block.sumBlock(); + countBlock = block.countBlock(); } minBuilder.copyFrom(minBlock, beginInclusive, endExclusive); maxBuilder.copyFrom(maxBlock, beginInclusive, endExclusive); @@ -103,20 +103,23 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) { @Override public Block build() { - Block[] blocks = new Block[4]; + DoubleBlock minBlock = null; + DoubleBlock maxBlock = null; + DoubleBlock sumBlock = null; + IntBlock countBlock = null; boolean success = false; try { finish(); - blocks[Metric.MIN.getIndex()] = minBuilder.build(); - blocks[Metric.MAX.getIndex()] = maxBuilder.build(); - blocks[Metric.SUM.getIndex()] = sumBuilder.build(); - blocks[Metric.COUNT.getIndex()] = countBuilder.build(); - CompositeBlock block = new CompositeBlock(blocks); + minBlock = minBuilder.build(); + maxBlock = maxBuilder.build(); + sumBlock = sumBuilder.build(); + countBlock = countBuilder.build(); + AggregateMetricDoubleBlock block = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock); success = true; return block; } finally { if (success == false) { - Releasables.closeExpectNoException(blocks); + Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock); } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java index 3946d19288192..f3f6f83041d04 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java @@ -9,6 +9,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; @@ -343,6 +344,10 @@ static Block[] buildAll(Block.Builder... builders) { * This should be paired with {@link #readTypedBlock(BlockStreamInput)} */ static void writeTypedBlock(Block block, StreamOutput out) throws IOException { + if (out.getTransportVersion().before(TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK) + && block instanceof AggregateMetricDoubleBlock aggregateMetricDoubleBlock) { + block = aggregateMetricDoubleBlock.asCompositeBlock(); + } block.elementType().writeTo(out); block.writeTo(out); } @@ -353,7 +358,12 @@ static void writeTypedBlock(Block block, StreamOutput out) throws IOException { */ static Block readTypedBlock(BlockStreamInput in) throws IOException { ElementType elementType = ElementType.readFrom(in); - return elementType.reader.readBlock(in); + Block block = elementType.reader.readBlock(in); + if (in.getTransportVersion().before(TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK) + && block instanceof CompositeBlock compositeBlock) { + block = AggregateMetricDoubleBlock.fromCompositeBlock(compositeBlock); + } + return block; } /** diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index 1d6012a8a73de..e06efb09f5724 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -234,14 +234,7 @@ private static Block constantBlock(BlockFactory blockFactory, ElementType type, case BYTES_REF -> blockFactory.newConstantBytesRefBlockWith(toBytesRef(val), size); case DOUBLE -> blockFactory.newConstantDoubleBlockWith((double) val, size); case BOOLEAN -> blockFactory.newConstantBooleanBlockWith((boolean) val, size); - case COMPOSITE -> { - if (val instanceof AggregateMetricDoubleLiteral aggregateMetricDoubleLiteral) { - yield blockFactory.newConstantAggregateMetricDoubleBlock(aggregateMetricDoubleLiteral, size); - } - throw new UnsupportedOperationException( - "Composite block but received value that wasn't AggregateMetricDoubleLiteral [" + val + "]" - ); - } + case AGGREGATE_METRIC_DOUBLE -> blockFactory.newConstantAggregateMetricDoubleBlock((AggregateMetricDoubleLiteral) val, size); default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]"); }; } @@ -285,17 +278,14 @@ private static Object valueAtOffset(Block block, int offset) { DocVector v = ((DocBlock) block).asVector(); yield new Doc(v.shards().getInt(offset), v.segments().getInt(offset), v.docs().getInt(offset)); } - case COMPOSITE -> { - CompositeBlock compositeBlock = (CompositeBlock) block; - var minBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()); - var maxBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()); - var sumBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()); - var countBlock = (IntBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()); + case COMPOSITE -> throw new IllegalArgumentException("can't read values from composite blocks"); + case AGGREGATE_METRIC_DOUBLE -> { + AggregateMetricDoubleBlock aggBlock = (AggregateMetricDoubleBlock) block; yield new AggregateMetricDoubleLiteral( - minBlock.getDouble(offset), - maxBlock.getDouble(offset), - sumBlock.getDouble(offset), - countBlock.getInt(offset) + aggBlock.minBlock().getDouble(offset), + aggBlock.maxBlock().getDouble(offset), + aggBlock.sumBlock().getDouble(offset), + aggBlock.countBlock().getInt(offset) ); } case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java index 10b91c578a6d6..b6cb024534818 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java @@ -92,16 +92,12 @@ public int getTotalValueCount() { @Override public int getFirstValueIndex(int position) { - return blocks[0].getFirstValueIndex(position); + throw new UnsupportedOperationException("Composite block"); } @Override public int getValueCount(int position) { - int max = 0; - for (var block : blocks) { - max = Math.max(max, block.getValueCount(position)); - } - return max; + throw new UnsupportedOperationException("Composite block"); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java index e509ea2508a94..64fc0d2bdd263 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java @@ -42,14 +42,29 @@ public enum ElementType { /** * Composite blocks which contain array of sub-blocks. */ - COMPOSITE(8, "Composite", BlockFactory::newAggregateMetricDoubleBlockBuilder, CompositeBlock::readFrom), + COMPOSITE( + 8, + "Composite", + (blockFactory, estimatedSize) -> { throw new UnsupportedOperationException("can't build composite blocks"); }, + CompositeBlock::readFrom + ), /** * Intermediate blocks which don't support retrieving elements. */ UNKNOWN(9, "Unknown", (blockFactory, estimatedSize) -> { throw new UnsupportedOperationException("can't build null blocks"); }, in -> { throw new UnsupportedOperationException("can't read unknown blocks"); - }); + }), + + /** + * Blocks that contain aggregate_metric_doubles. + */ + AGGREGATE_METRIC_DOUBLE( + 10, + "AggregateMetricDouble", + BlockFactory::newAggregateMetricDoubleBlockBuilder, + AggregateMetricDoubleBlock::readFrom + ); private interface BuilderSupplier { Block.Builder newBlockBuilder(BlockFactory blockFactory, int estimatedSize); @@ -95,7 +110,7 @@ public static ElementType fromJava(Class type) { } else if (type == Boolean.class) { elementType = BOOLEAN; } else if (type == AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.class) { - elementType = COMPOSITE; + elementType = AGGREGATE_METRIC_DOUBLE; } else if (type == null || type == Void.class) { elementType = NULL; } else { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index f05d552c3e628..d5e7656c637b4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -167,6 +167,7 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC case NULL -> offset -> null; case DOC -> throw new IllegalArgumentException("can't read values from [doc] block"); case COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block"); + case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block"); case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; return new TermQueryList(field, searchExecutionContext, block, null, blockToJavaObject); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java index a1e6cd17fd625..6ad550c439ecf 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java @@ -54,7 +54,7 @@ static ResultBuilder resultBuilderFor( case DOUBLE -> new ResultBuilderForDouble(blockFactory, encoder, inKey, positions); case NULL -> new ResultBuilderForNull(blockFactory); case DOC -> new ResultBuilderForDoc(blockFactory, positions); - case COMPOSITE -> new ResultBuilderForComposite(blockFactory, positions); + case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions); default -> { assert false : "Result builder for [" + elementType + "]"; throw new UnsupportedOperationException("Result builder for [" + elementType + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForComposite.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java similarity index 84% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForComposite.java rename to x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java index c9844f94988c7..a814cf5f98e0a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForComposite.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForAggregateMetricDouble.java @@ -15,17 +15,17 @@ import java.util.List; -public class ResultBuilderForComposite implements ResultBuilder { +public class ResultBuilderForAggregateMetricDouble implements ResultBuilder { private final AggregateMetricDoubleBlockBuilder builder; - ResultBuilderForComposite(BlockFactory blockFactory, int positions) { + ResultBuilderForAggregateMetricDouble(BlockFactory blockFactory, int positions) { this.builder = blockFactory.newAggregateMetricDoubleBlockBuilder(positions); } @Override public void decodeKey(BytesRef keys) { - throw new AssertionError("Composite Block can't be a key"); + throw new AssertionError("AggregateMetricDoubleBlock can't be a key"); } @Override @@ -51,7 +51,7 @@ public Block build() { @Override public String toString() { - return "ValueExtractorForComposite"; + return "ValueExtractorForAggregateMetricDouble"; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java index 7d1e30f432a5a..ccf36a08c280b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java @@ -7,10 +7,10 @@ package org.elasticsearch.compute.operator.topn; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; @@ -41,7 +41,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, case DOUBLE -> ValueExtractorForDouble.extractorFor(encoder, inKey, (DoubleBlock) block); case NULL -> new ValueExtractorForNull(); case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector()); - case COMPOSITE -> new ValueExtractorForComposite(encoder, (CompositeBlock) block); + case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block); default -> { assert false : "No value extractor for [" + block.elementType() + "]"; throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java similarity index 60% rename from x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java rename to x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java index da58deb96d632..9bac1b9ba5eee 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForComposite.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForAggregateMetricDouble.java @@ -7,33 +7,24 @@ package org.elasticsearch.compute.operator.topn; -import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; -import org.elasticsearch.compute.data.CompositeBlock; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import java.util.List; -public class ValueExtractorForComposite implements ValueExtractor { - private final CompositeBlock block; +public class ValueExtractorForAggregateMetricDouble implements ValueExtractor { + private final AggregateMetricDoubleBlock block; - ValueExtractorForComposite(TopNEncoder encoder, CompositeBlock block) { + ValueExtractorForAggregateMetricDouble(TopNEncoder encoder, AggregateMetricDoubleBlock block) { assert encoder == TopNEncoder.DEFAULT_UNSORTABLE; this.block = block; } @Override public void writeValue(BreakingBytesRefBuilder values, int position) { - if (block.getBlockCount() != AggregateMetricDoubleBlockBuilder.Metric.values().length) { - throw new UnsupportedOperationException("Composite Blocks for non-aggregate-metric-doubles do not have value extractors"); - } - for (AggregateMetricDoubleBlockBuilder.Metric metric : List.of( - AggregateMetricDoubleBlockBuilder.Metric.MIN, - AggregateMetricDoubleBlockBuilder.Metric.MAX, - AggregateMetricDoubleBlockBuilder.Metric.SUM - )) { - DoubleBlock doubleBlock = block.getBlock(metric.getIndex()); + for (DoubleBlock doubleBlock : List.of(block.minBlock(), block.maxBlock(), block.sumBlock())) { if (doubleBlock.isNull(position)) { TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values); } else { @@ -41,7 +32,7 @@ public void writeValue(BreakingBytesRefBuilder values, int position) { TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(doubleBlock.getDouble(position), values); } } - IntBlock intBlock = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()); + IntBlock intBlock = block.countBlock(); if (intBlock.isNull(position)) { TopNEncoder.DEFAULT_UNSORTABLE.encodeBoolean(false, values); } else { @@ -52,6 +43,6 @@ public void writeValue(BreakingBytesRefBuilder values, int position) { @Override public String toString() { - return "ValueExtractorForComposite"; + return "ValueExtractorForAggregateMetricDouble"; } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java index 1d3c8df914bc6..cfaf77dd1e38d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java @@ -27,7 +27,11 @@ public class BlockBuilderCopyFromTests extends ESTestCase { public static List params() { List params = new ArrayList<>(); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE) { + if (e == ElementType.UNKNOWN + || e == ElementType.NULL + || e == ElementType.DOC + || e == ElementType.COMPOSITE + || e == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } for (boolean nullAllowed : new boolean[] { false, true }) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java index c4e30d6766724..c8e1ece222b63 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java @@ -33,7 +33,11 @@ public class BlockBuilderTests extends ESTestCase { public static List params() { List params = new ArrayList<>(); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE) { + if (e == ElementType.UNKNOWN + || e == ElementType.NULL + || e == ElementType.DOC + || e == ElementType.COMPOSITE + || e == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } params.add(new Object[] { e }); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index a45560a31207e..c781fdcf8b60a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -44,7 +44,11 @@ public class BlockMultiValuedTests extends ESTestCase { public static List params() { List params = new ArrayList<>(); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE) { + if (e == ElementType.UNKNOWN + || e == ElementType.NULL + || e == ElementType.DOC + || e == ElementType.COMPOSITE + || e == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } for (boolean nullAllowed : new boolean[] { false, true }) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java index 1e63f4f2be6fe..c186675296cf1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.unit.ByteSizeValue; @@ -22,6 +24,7 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.EqualsHashCodeTestUtils; +import org.elasticsearch.test.TransportVersionUtils; import java.io.IOException; import java.util.Arrays; @@ -383,7 +386,16 @@ public void testCompositeBlock() throws Exception { for (int b = 0; b < numBlocks; b++) { assertThat(origBlock.getBlock(b), equalTo(blocks[b])); } - try (CompositeBlock deserBlock = serializeDeserializeBlock(origBlock)) { + try ( + CompositeBlock deserBlock = serializeDeserializeBlockWithVersion( + origBlock, + TransportVersionUtils.randomVersionBetween( + random(), + TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK, + TransportVersion.current() + ) + ) + ) { assertThat(deserBlock.getBlockCount(), equalTo(numBlocks)); for (int b = 0; b < numBlocks; b++) { assertThat(deserBlock.getBlock(b), equalTo(origBlock.getBlock(b))); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java index 540adf0f2c1bf..964e61d69e0db 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java @@ -19,7 +19,9 @@ public class CompositeBlockTests extends ComputeTestCase { static List supportedSubElementTypes = Arrays.stream(ElementType.values()) - .filter(e -> e != ElementType.COMPOSITE && e != ElementType.UNKNOWN && e != ElementType.DOC) + .filter( + e -> e != ElementType.COMPOSITE && e != ElementType.UNKNOWN && e != ElementType.DOC && e != ElementType.AGGREGATE_METRIC_DOUBLE + ) .toList(); public static CompositeBlock randomCompositeBlock( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java index b243563ced98b..9e420bdab6590 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/SerializationTestCase.java @@ -53,10 +53,14 @@ BlockStreamInput blockStreamInput(BytesStreamOutput out) { return new BlockStreamInput(ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())), blockFactory); } - @SuppressWarnings("unchecked") T serializeDeserializeBlock(T origBlock) throws IOException { + TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random()); + return serializeDeserializeBlockWithVersion(origBlock, version); + } + + @SuppressWarnings("unchecked") + T serializeDeserializeBlockWithVersion(T origBlock, TransportVersion version) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { - TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random()); out.setTransportVersion(version); Block.writeTypedBlock(origBlock, out); try (BlockStreamInput in = blockStreamInput(out)) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java index 75495518f7523..5cb72177be6f7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java @@ -30,7 +30,11 @@ public class VectorBuilderTests extends ESTestCase { public static List params() { List params = new ArrayList<>(); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE) { + if (e == ElementType.UNKNOWN + || e == ElementType.NULL + || e == ElementType.DOC + || e == ElementType.COMPOSITE + || e == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } params.add(new Object[] { e }); @@ -114,7 +118,7 @@ public void testCranky() { private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactory) { return switch (elementType) { - case NULL, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorBuilder(estimatedSize); case BYTES_REF -> blockFactory.newBytesRefVectorBuilder(estimatedSize); case FLOAT -> blockFactory.newFloatVectorBuilder(estimatedSize); @@ -126,7 +130,7 @@ private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactor private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.Builder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java index 63f4962a99523..0dbc177350f0b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java @@ -33,7 +33,8 @@ public static List params() { || elementType == ElementType.COMPOSITE || elementType == ElementType.NULL || elementType == ElementType.DOC - || elementType == ElementType.BYTES_REF) { + || elementType == ElementType.BYTES_REF + || elementType == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } params.add(new Object[] { elementType }); @@ -117,7 +118,7 @@ public void testCranky() { private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { return switch (elementType) { - case NULL, BYTES_REF, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, BYTES_REF, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorFixedBuilder(size); case DOUBLE -> blockFactory.newDoubleVectorFixedBuilder(size); case FLOAT -> blockFactory.newFloatVectorFixedBuilder(size); @@ -128,7 +129,7 @@ private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.FixedBuilder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java index 958c2a05e659d..4caa6415f0cfe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java @@ -58,7 +58,14 @@ public class MultivalueDedupeTests extends ESTestCase { public static List supportedTypes() { List supported = new ArrayList<>(); for (ElementType elementType : ElementType.values()) { - if (oneOf(elementType, ElementType.UNKNOWN, ElementType.DOC, ElementType.COMPOSITE, ElementType.FLOAT)) { + if (oneOf( + elementType, + ElementType.UNKNOWN, + ElementType.DOC, + ElementType.COMPOSITE, + ElementType.FLOAT, + ElementType.AGGREGATE_METRIC_DOUBLE + )) { continue; } supported.add(elementType); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java index 836b8795a6f79..101c129e77201 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java @@ -40,7 +40,7 @@ public static Iterable parameters() { switch (e) { case UNKNOWN -> { } - case COMPOSITE -> { + case COMPOSITE, AGGREGATE_METRIC_DOUBLE -> { // TODO: add later } case FLOAT -> { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index 753a546256cc7..bf2f3d2b97dd7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -61,6 +61,7 @@ import static java.util.Comparator.naturalOrder; import static java.util.Comparator.reverseOrder; import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; +import static org.elasticsearch.compute.data.ElementType.AGGREGATE_METRIC_DOUBLE; import static org.elasticsearch.compute.data.ElementType.BOOLEAN; import static org.elasticsearch.compute.data.ElementType.BYTES_REF; import static org.elasticsearch.compute.data.ElementType.COMPOSITE; @@ -521,7 +522,7 @@ public void testCollectAllValues() { encoders.add(DEFAULT_SORTABLE); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == COMPOSITE) { + if (e == ElementType.UNKNOWN || e == COMPOSITE || e == AGGREGATE_METRIC_DOUBLE) { continue; } elementTypes.add(e); @@ -592,7 +593,7 @@ public void testCollectAllValues_RandomMultiValues() { for (int type = 0; type < blocksCount; type++) { ElementType e = randomFrom(ElementType.values()); - if (e == ElementType.UNKNOWN || e == COMPOSITE) { + if (e == ElementType.UNKNOWN || e == COMPOSITE || e == AGGREGATE_METRIC_DOUBLE) { continue; } elementTypes.add(e); @@ -977,7 +978,7 @@ public void testRandomMultiValuesTopN() { for (int type = 0; type < blocksCount; type++) { ElementType e = randomValueOtherThanMany( - t -> t == ElementType.UNKNOWN || t == ElementType.DOC || t == COMPOSITE, + t -> t == ElementType.UNKNOWN || t == ElementType.DOC || t == COMPOSITE || t == AGGREGATE_METRIC_DOUBLE, () -> randomFrom(ElementType.values()) ); elementTypes.add(e); diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index 42b5fd6aa8e7d..80f6cbdb81e8b 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -57,6 +57,7 @@ public static Object randomValue(ElementType e) { case DOC -> new BlockUtils.Doc(randomInt(), randomInt(), between(0, Integer.MAX_VALUE)); case NULL -> null; case COMPOSITE -> throw new IllegalArgumentException("can't make random values for composite"); + case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't make random values for aggregate_metric_double"); case UNKNOWN -> throw new IllegalArgumentException("can't make random values for [" + e + "]"); }; } diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java index dabf8d437fb5f..8b3550261d0f6 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java @@ -38,7 +38,11 @@ public record RandomBlock(List> values, Block block) { */ public static ElementType randomElementType() { return ESTestCase.randomValueOtherThanMany( - e -> e == ElementType.UNKNOWN || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE, + e -> e == ElementType.UNKNOWN + || e == ElementType.NULL + || e == ElementType.DOC + || e == ElementType.COMPOSITE + || e == ElementType.AGGREGATE_METRIC_DOUBLE, () -> ESTestCase.randomFrom(ElementType.values()) ); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index 24db9f6931672..6ea4f553ff1ed 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -566,18 +566,12 @@ public static Type asType(ElementType elementType, Type actualType) { case BYTES_REF -> bytesRefBlockType(actualType); case BOOLEAN -> BOOLEAN; case DOC -> throw new IllegalArgumentException("can't assert on doc blocks"); - case COMPOSITE -> compositeBlockType(actualType); + case COMPOSITE -> throw new IllegalArgumentException("can't assert on composite blocks"); + case AGGREGATE_METRIC_DOUBLE -> AGGREGATE_METRIC_DOUBLE; case UNKNOWN -> throw new IllegalArgumentException("Unknown block types cannot be handled"); }; } - private static Type compositeBlockType(Type actualType) { - return switch (actualType) { - case AGGREGATE_METRIC_DOUBLE -> actualType; - default -> throw new IllegalArgumentException("can't assert on composite blocks that aren't aggregate metric doubles"); - }; - } - private static Type bytesRefBlockType(Type actualType) { return switch (actualType) { case NULL -> NULL; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java index 597b3021a1a13..7d580b897c2a0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java @@ -10,10 +10,10 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -154,7 +154,7 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa @Override protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex) throws IOException { - return builder.value(aggregateMetricDoubleBlockToString((CompositeBlock) block, valueIndex)); + return builder.value(aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, valueIndex)); } }; case NULL -> new PositionToXContent(block) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java index 4fd554582b56b..38c2916d73b72 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java @@ -11,10 +11,10 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -134,7 +134,7 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR case GEO_POINT, GEO_SHAPE, CARTESIAN_POINT, CARTESIAN_SHAPE -> spatialToString( ((BytesRefBlock) block).getBytesRef(offset, scratch) ); - case AGGREGATE_METRIC_DOUBLE -> aggregateMetricDoubleBlockToString((CompositeBlock) block, offset); + case AGGREGATE_METRIC_DOUBLE -> aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, offset); case UNSUPPORTED -> (String) null; case SOURCE -> { BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java index eb1f4f95db6fe..38b391552cfe2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xpack.core.esql.action.ColumnInfo; @@ -103,7 +104,8 @@ static Iterator rowValues(List columns, Li assert page.getBlockCount() == columnCount : page.getBlockCount() + " != " + columnCount; final PositionToXContent[] toXContents = new PositionToXContent[columnCount]; for (int column = 0; column < columnCount; column++) { - toXContents[column] = PositionToXContent.positionToXContent(columns.get(column), page.getBlock(column), scratch); + Block block = page.getBlock(column); + toXContents[column] = PositionToXContent.positionToXContent(columns.get(column), block, scratch); } return Iterators.forRange(0, page.getPositionCount(), position -> (builder, params) -> { builder.startArray(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java index f1bde9f57b671..61129df973a55 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java @@ -10,9 +10,9 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; @@ -145,8 +145,9 @@ public Block eval(Page page) { return block; } try { - CompositeBlock compositeBlock = (CompositeBlock) block; - Block resultBlock = compositeBlock.getBlock(((Number) subfieldIndex.fold(FoldContext.small())).intValue()); + Block resultBlock = ((AggregateMetricDoubleBlock) block).getMetricBlock( + ((Number) subfieldIndex.fold(FoldContext.small())).intValue() + ); resultBlock.incRef(); return resultBlock; } finally { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java index 23d4d895bb0fc..87a4492f33094 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToAggregateMetricDouble.java @@ -9,10 +9,10 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.IntBlock; @@ -141,26 +141,20 @@ private void appendValue(double value) { } private Block build() { - Block[] blocks = new Block[4]; - Block block; + DoubleBlock doubleBlock = null; + IntBlock countBlock = null; boolean success = false; try { - block = valuesBuilder.build().asBlock(); - blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = block; - blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = block; - block.incRef(); - blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = block; - block.incRef(); - blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = blockFactory.newConstantIntBlockWith( - 1, - block.getPositionCount() - ); - CompositeBlock compositeBlock = new CompositeBlock(blocks); + doubleBlock = valuesBuilder.build().asBlock(); + countBlock = blockFactory.newConstantIntBlockWith(1, doubleBlock.getPositionCount()); + AggregateMetricDoubleBlock aggBlock = new AggregateMetricDoubleBlock(doubleBlock, doubleBlock, doubleBlock, countBlock); + doubleBlock.incRef(); + doubleBlock.incRef(); success = true; - return compositeBlock; + return aggBlock; } finally { if (success == false) { - Releasables.closeExpectNoException(blocks); + Releasables.closeExpectNoException(doubleBlock, countBlock); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringFromAggregateMetricDoubleEvaluator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringFromAggregateMetricDoubleEvaluator.java index cb88835378901..e67ba29ab0227 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringFromAggregateMetricDoubleEvaluator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToStringFromAggregateMetricDoubleEvaluator.java @@ -8,9 +8,9 @@ package org.elasticsearch.xpack.esql.expression.function.scalar.convert; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BytesRefBlock; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.Vector; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; @@ -37,13 +37,13 @@ protected Block evalVector(Vector v) { return evalBlock(v.asBlock()); } - private static BytesRef evalValue(CompositeBlock compositeBlock, int index) { - return new BytesRef(aggregateMetricDoubleBlockToString(compositeBlock, index)); + private static BytesRef evalValue(AggregateMetricDoubleBlock aggBlock, int index) { + return new BytesRef(aggregateMetricDoubleBlockToString(aggBlock, index)); } @Override public Block evalBlock(Block b) { - CompositeBlock block = (CompositeBlock) b; + AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b; int positionCount = block.getPositionCount(); try (BytesRefBlock.Builder builder = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) { for (int p = 0; p < positionCount; p++) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java index 5c81e7bf8a6e4..ded94fd5a0a63 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java @@ -109,7 +109,9 @@ private static DataType toDataType(ElementType elementType) { case INT -> DataType.INTEGER; case LONG -> DataType.LONG; case DOUBLE -> DataType.DOUBLE; - case FLOAT, NULL, DOC, COMPOSITE, UNKNOWN -> throw new EsqlIllegalArgumentException("unsupported agg type: " + elementType); + case FLOAT, NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new EsqlIllegalArgumentException( + "unsupported agg type: " + elementType + ); }; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index df2da82e3bee0..f77532285c379 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -298,7 +298,8 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field case TSID_DATA_TYPE -> ElementType.BYTES_REF; case GEO_POINT, CARTESIAN_POINT -> fieldExtractPreference == DOC_VALUES ? ElementType.LONG : ElementType.BYTES_REF; case GEO_SHAPE, CARTESIAN_SHAPE -> fieldExtractPreference == EXTRACT_SPATIAL_BOUNDS ? ElementType.INT : ElementType.BYTES_REF; - case PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE -> ElementType.COMPOSITE; + case PARTIAL_AGG -> ElementType.COMPOSITE; + case AGGREGATE_METRIC_DOUBLE -> ElementType.AGGREGATE_METRIC_DOUBLE; case SHORT, BYTE, DATE_PERIOD, TIME_DURATION, OBJECT, FLOAT, HALF_FLOAT, SCALED_FLOAT -> throw EsqlIllegalArgumentException .illegalDataType(dataType); }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java index 1e7d4e32e3137..338cbf2ae4ca0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java @@ -16,9 +16,9 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.time.DateUtils; +import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.Metric; -import org.elasticsearch.compute.data.CompositeBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.search.DocValueFormat; @@ -685,16 +685,16 @@ public static long booleanToUnsignedLong(boolean number) { return number ? ONE_AS_UNSIGNED_LONG : ZERO_AS_UNSIGNED_LONG; } - public static String aggregateMetricDoubleBlockToString(CompositeBlock compositeBlock, int index) { + public static String aggregateMetricDoubleBlockToString(AggregateMetricDoubleBlock aggBlock, int index) { try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); for (Metric metric : List.of(Metric.MIN, Metric.MAX, Metric.SUM)) { - var block = compositeBlock.getBlock(metric.getIndex()); + var block = aggBlock.getMetricBlock(metric.getIndex()); if (block.isNull(index) == false) { builder.field(metric.getLabel(), ((DoubleBlock) block).getDouble(index)); } } - var countBlock = compositeBlock.getBlock(Metric.COUNT.getIndex()); + var countBlock = aggBlock.getMetricBlock(Metric.COUNT.getIndex()); if (countBlock.isNull(index) == false) { builder.field(Metric.COUNT.getLabel(), ((IntBlock) countBlock).getInt(index)); }