Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static TransportVersion def(int id) {
public static final TransportVersion DENSE_VECTOR_OFF_HEAP_STATS = def(9_062_00_0);
public static final TransportVersion RANDOM_SAMPLER_QUERY_BUILDER = def(9_063_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_0_00);
public static final TransportVersion AGGREGATE_METRIC_DOUBLE_BLOCK = def(9_065_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

public final class AggregateMetricDoubleBlock extends AbstractNonThreadSafeRefCounted implements Block {
private final DoubleBlock minBlock;
private final DoubleBlock maxBlock;
private final DoubleBlock sumBlock;
private final IntBlock countBlock;
private final int positionCount;

public AggregateMetricDoubleBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
this.minBlock = minBlock;
this.maxBlock = maxBlock;
this.sumBlock = sumBlock;
this.countBlock = countBlock;
this.positionCount = minBlock.getPositionCount();
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
if (b.getPositionCount() != positionCount) {
assert false : "expected positionCount=" + positionCount + " but was " + b;
throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b);
}
if (b.isReleased()) {
assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released";
throw new IllegalArgumentException(
"can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"
);
}
}
}

public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block) {
assert block.getBlockCount() == 4
: "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks";
DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
return new AggregateMetricDoubleBlock(min, max, sum, count);
}

public static CompositeBlock toCompositeBlock(AggregateMetricDoubleBlock block) {
final Block[] blocks = new Block[4];
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = block.minBlock();
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = block.maxBlock();
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = block.sumBlock();
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = block.countBlock();
return new CompositeBlock(blocks);
}

@Override
protected void closeInternal() {
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
}

@Override
public Vector asVector() {
return null;
}

@Override
public int getTotalValueCount() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as CompositeBlock but I'm not sure if this is what we want for AggregateMetricDouble?

int totalValueCount = 0;
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
totalValueCount += b.getTotalValueCount();
}
return totalValueCount;
}

@Override
public int getPositionCount() {
return positionCount;
}

@Override
public int getFirstValueIndex(int position) {
return minBlock.getFirstValueIndex(position);
}

@Override
public int getValueCount(int position) {
int max = 0;
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
max = Math.max(max, b.getValueCount(position));
}
return max;
}

@Override
public ElementType elementType() {
return ElementType.AGGREGATE_METRIC_DOUBLE;
}

@Override
public BlockFactory blockFactory() {
return minBlock.blockFactory();
}

@Override
public void allowPassingToDifferentDriver() {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
block.allowPassingToDifferentDriver();
}
}

@Override
public boolean isNull(int position) {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
if (block.isNull(position) == false) {
return false;
}
}
return true;
}

@Override
public boolean mayHaveNulls() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I thought there was a push to skip streams as they're unnecessarily expensive?

}

@Override
public boolean areAllValuesNull() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull);
}

@Override
public boolean mayHaveMultivaluedFields() {
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields);
}

@Override
public boolean doesHaveMultivaluedFields() {
if (false == Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields)) {
return false;
}
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
}

@Override
public Block filter(int... positions) {
AggregateMetricDoubleBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.filter(positions);
newMaxBlock = maxBlock.filter(positions);
newSumBlock = sumBlock.filter(positions);
newCountBlock = countBlock.filter(positions);
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public Block keepMask(BooleanVector mask) {
AggregateMetricDoubleBlock result = null;
DoubleBlock newMinBlock = null;
DoubleBlock newMaxBlock = null;
DoubleBlock newSumBlock = null;
IntBlock newCountBlock = null;
try {
newMinBlock = minBlock.keepMask(mask);
newMaxBlock = maxBlock.keepMask(mask);
newSumBlock = sumBlock.keepMask(mask);
newCountBlock = countBlock.keepMask(mask);
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
return result;
} finally {
if (result == null) {
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
}
}
}

@Override
public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO: support
throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock");
}

@Override
public MvOrdering mvOrdering() {
// TODO: determine based on sub-blocks
return MvOrdering.UNORDERED;
}

@Override
public Block expand() {
// TODO: support
throw new UnsupportedOperationException("AggregateMetricDoubleBlock");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
block.writeTo(out);
}
}

public static Block readFrom(StreamInput in) throws IOException {
boolean success = false;
DoubleBlock minBlock = null;
DoubleBlock maxBlock = null;
DoubleBlock sumBlock = null;
IntBlock countBlock = null;
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
try {
minBlock = DoubleBlock.readFrom(blockStreamInput);
maxBlock = DoubleBlock.readFrom(blockStreamInput);
sumBlock = DoubleBlock.readFrom(blockStreamInput);
countBlock = IntBlock.readFrom(blockStreamInput);
AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
success = true;
return result;
} finally {
if (success == false) {
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
}
}
}

@Override
public long ramBytesUsed() {
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregateMetricDoubleBlock that = (AggregateMetricDoubleBlock) o;
return positionCount == that.positionCount
&& minBlock.equals(that.minBlock)
&& maxBlock.equals(that.maxBlock)
&& sumBlock.equals(that.sumBlock)
&& countBlock.equals(that.countBlock);
}

@Override
public int hashCode() {
return Objects.hash(
DoubleBlock.hash(minBlock),
DoubleBlock.hash(maxBlock),
DoubleBlock.hash(sumBlock),
IntBlock.hash(countBlock),
positionCount
);
}

public DoubleBlock minBlock() {
return minBlock;
}

public DoubleBlock maxBlock() {
return maxBlock;
}

public DoubleBlock sumBlock() {
return sumBlock;
}

public IntBlock countBlock() {
return countBlock;
}

public Block getMetricBlock(int index) {
if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) {
return minBlock;
}
if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) {
return maxBlock;
}
if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) {
return sumBlock;
}
if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) {
return countBlock;
}
throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ protected int elementSize() {
}

@Override
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) {
Block minBlock;
Block maxBlock;
Block sumBlock;
Block countBlock;
if (block.areAllValuesNull()) {
minBlock = block;
maxBlock = block;
sumBlock = block;
countBlock = block;
if (b.areAllValuesNull()) {
minBlock = b;
maxBlock = b;
sumBlock = b;
countBlock = b;
} else {
CompositeBlock composite = (CompositeBlock) block;
minBlock = composite.getBlock(Metric.MIN.getIndex());
maxBlock = composite.getBlock(Metric.MAX.getIndex());
sumBlock = composite.getBlock(Metric.SUM.getIndex());
countBlock = composite.getBlock(Metric.COUNT.getIndex());
AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b;
minBlock = block.minBlock();
maxBlock = block.maxBlock();
sumBlock = block.sumBlock();
countBlock = block.countBlock();
}
minBuilder.copyFrom(minBlock, beginInclusive, endExclusive);
maxBuilder.copyFrom(maxBlock, beginInclusive, endExclusive);
Expand Down Expand Up @@ -103,20 +103,23 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {

@Override
public Block build() {
Block[] blocks = new Block[4];
DoubleBlock minBlock = null;
DoubleBlock maxBlock = null;
DoubleBlock sumBlock = null;
IntBlock countBlock = null;
boolean success = false;
try {
finish();
blocks[Metric.MIN.getIndex()] = minBuilder.build();
blocks[Metric.MAX.getIndex()] = maxBuilder.build();
blocks[Metric.SUM.getIndex()] = sumBuilder.build();
blocks[Metric.COUNT.getIndex()] = countBuilder.build();
CompositeBlock block = new CompositeBlock(blocks);
minBlock = minBuilder.build();
maxBlock = maxBuilder.build();
sumBlock = sumBuilder.build();
countBlock = countBuilder.build();
AggregateMetricDoubleBlock block = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
success = true;
return block;
} finally {
if (success == false) {
Releasables.closeExpectNoException(blocks);
Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock);
}
}
}
Expand Down
Loading