Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
throw new UnsupportedOperationException("null block");
}

@Override
public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) {
assert false : "null block";
throw new UnsupportedOperationException("null block");
}

@Override
public int getTotalValueCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
}
}

@Override
public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueIndex should be position instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intended usage pattern for serializeExponentialHistogram is just like for getters on blocks:

for (int i=0; i<block.getPositionCount(); i++) {
  for (int j = 0; j < block.getValueCount(i); j++) {
    block.serializeExponentialHistogram(block.getFirstValueIndex(i) + j, ...)
  }
}

So valueIndex instead of position is correct here.
Right now it is true for exponential histogram blocks that we just use the positions directly as valueIndex, but that will change when we support multi-values.

See also this comment: #133393 (comment)

I'll add a comment in ExponentialHistogramArrayBlock explaining the mapping of positions and valueIndices to positions in the sub-blocks.

long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex));
out.appendLong(valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)) -> valueCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are confusing this with the value count returned via getValueCount(position). This is a different value-count: It is the number of samples the histogram was generated from. I've added a comment to avoid this confusion.

out.appendDouble(sums.getDouble(sums.getFirstValueIndex(valueIndex)));
out.appendDouble(zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex)));
if (valueCount > 0) {
// min / max are only non-null for non-empty histograms
out.appendDouble(minima.getDouble(minima.getFirstValueIndex(valueIndex)));
out.appendDouble(maxima.getDouble(maxima.getFirstValueIndex(valueIndex)));
}
out.appendBytesRef(encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), scratch));
}

@Override
protected void closeInternal() {
Releasables.close(getSubBlocks());
Expand Down Expand Up @@ -376,5 +390,4 @@ public int hashCode() {
// this ensures proper equality with null blocks and should be unique enough for practical purposes
return encodedHistograms.hashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;

/**
Expand All @@ -27,6 +28,16 @@ public sealed interface ExponentialHistogramBlock extends Block permits Constant
*/
ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialHistogramScratch scratch);

/**
* Serializes the exponential histogram at the given index into the provided output, so that it can be read back
* via {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
*
* @param valueIndex
* @param out
* @param scratch
*/
void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueIndex -> position

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBlock blockB) {
if (blockA == blockB) {
return true;
Expand All @@ -42,4 +53,27 @@ static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBloc
};
}

/**
* Abstraction to use for writing individual values via {@link #serializeExponentialHistogram(int, SerializedOutput, BytesRef)}.
*/
interface SerializedOutput {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to have these interfaces, but they're okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't want to expose the zero-threshold from the block to prevent it becoming a maintenance nightmare if we do changes to the disk format. That's why I want to keep the knowledge of its existence local to the block implementation.

I though pulling in a direct dependency on the TopNEncoder would not be a good idea, that's why I added these interfaces in between. If you prefer it, I can directly use the TopNEncoder in serializeExponentialHistogram instead in a follow-up

void appendDouble(double value);

void appendLong(long value);

void appendBytesRef(BytesRef bytesRef);
}

/**
* Abstraction to use for reading individual serialized via
* {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
*/
interface SerializedInput {
double readDouble();

long readLong();

BytesRef readBytesRef(BytesRef tempBytesRef);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
Expand All @@ -25,6 +26,8 @@ public class ExponentialHistogramBlockBuilder implements Block.Builder, BlockLoa
private final DoubleBlock.Builder zeroThresholdsBuilder;
private final BytesRefBlock.Builder encodedHistogramsBuilder;

private final BytesRef scratch = new BytesRef();

ExponentialHistogramBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
DoubleBlock.Builder minimaBuilder = null;
DoubleBlock.Builder maximaBuilder = null;
Expand Down Expand Up @@ -131,6 +134,27 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) {
return this;
}

/**
* Decodes and appends a value serialized with
* {@link ExponentialHistogramBlock#serializeExponentialHistogram(int, ExponentialHistogramBlock.SerializedOutput, BytesRef)}.
*
* @param input the input to deserialize from
*/
public void deserializeAndAppend(ExponentialHistogramBlock.SerializedInput input) {
long valueCount = input.readLong();
valueCountsBuilder.appendLong(valueCount);
sumsBuilder.appendDouble(input.readDouble());
zeroThresholdsBuilder.appendDouble(input.readDouble());
if (valueCount > 0) {
minimaBuilder.appendDouble(input.readDouble());
maximaBuilder.appendDouble(input.readDouble());
} else {
minimaBuilder.appendNull();
maximaBuilder.appendNull();
}
encodedHistogramsBuilder.appendBytesRef(input.readBytesRef(scratch));
}

@Override
public ExponentialHistogramBlock build() {
DoubleBlock minima = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static ResultBuilder resultBuilderFor(
case NULL -> new ResultBuilderForNull(blockFactory);
case DOC -> new ResultBuilderForDoc(blockFactory, (DocVectorEncoder) encoder, positions);
case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions);
case EXPONENTIAL_HISTOGRAM -> new ResultBuilderForExponentialHistogram(blockFactory, positions);
default -> {
assert false : "Result builder for [" + elementType + "]";
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder;

public class ResultBuilderForExponentialHistogram implements ResultBuilder {

private final ExponentialHistogramBlockBuilder builder;
private final ReusableTopNEncoderInput reusableInput = new ReusableTopNEncoderInput();

ResultBuilderForExponentialHistogram(BlockFactory blockFactory, int positions) {
this.builder = blockFactory.newExponentialHistogramBlockBuilder(positions);
}

@Override
public void decodeKey(BytesRef keys) {
throw new AssertionError("ExponentialHistogramBlock can't be a key");
}

@Override
public void decodeValue(BytesRef values) {
int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values);
if (count == 0) {
builder.appendNull();
return;
}
assert count == 1 : "ExponentialHistogramBlock does not support multi values";
reusableInput.inputValues = values;
builder.deserializeAndAppend(reusableInput);
}

@Override
public Block build() {
return builder.build();
}

@Override
public String toString() {
return "ResultBuilderForExponentialHistogram";
}

@Override
public void close() {
builder.close();
}

private class ReusableTopNEncoderInput implements ExponentialHistogramBlock.SerializedInput {
BytesRef inputValues;

@Override
public double readDouble() {
return TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(inputValues);
}

@Override
public long readLong() {
return TopNEncoder.DEFAULT_UNSORTABLE.decodeLong(inputValues);
}

@Override
public BytesRef readBytesRef(BytesRef tempBytesRef) {
return TopNEncoder.DEFAULT_UNSORTABLE.decodeBytesRef(inputValues, tempBytesRef);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
Expand Down Expand Up @@ -53,6 +54,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
case NULL -> new ValueExtractorForNull();
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block);
case EXPONENTIAL_HISTOGRAM -> new ValueExtractorForExponentialHistogram(encoder, (ExponentialHistogramBlock) block);
default -> {
assert false : "No value extractor for [" + block.elementType() + "]";
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

public class ValueExtractorForExponentialHistogram implements ValueExtractor {
private final ExponentialHistogramBlock block;

private final BytesRef scratch = new BytesRef();
private final ReusableTopNEncoderOutput reusableOutput = new ReusableTopNEncoderOutput();

ValueExtractorForExponentialHistogram(TopNEncoder encoder, ExponentialHistogramBlock block) {
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
this.block = block;
}

@Override
public void writeValue(BreakingBytesRefBuilder values, int position) {
// number of multi-values first for compatibility with ValueExtractorForNull
if (block.isNull(position)) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(0, values);
} else {
assert block.getValueCount(position) == 1 : "Multi-valued ExponentialHistogram blocks are not supported in TopN";
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values);
int valueIndex = block.getFirstValueIndex(position);
reusableOutput.target = values;
block.serializeExponentialHistogram(valueIndex, reusableOutput, scratch);
}
}

@Override
public String toString() {
return "ValueExtractorForExponentialHistogram";
}

private static final class ReusableTopNEncoderOutput implements ExponentialHistogramBlock.SerializedOutput {
BreakingBytesRefBuilder target;

@Override
public void appendDouble(double value) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, target);
}

@Override
public void appendLong(long value) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeLong(value, target);
}

@Override
public void appendBytesRef(BytesRef value) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBytesRef(value, target);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static Iterable<Object[]> parameters() {
case UNKNOWN -> {
supportsNull = false;
}
case COMPOSITE, EXPONENTIAL_HISTOGRAM -> {
case COMPOSITE -> {
// TODO: add later
supportsNull = false;
}
Expand Down Expand Up @@ -130,6 +130,9 @@ public static Iterable<Object[]> parameters() {
) }
);
}
case EXPONENTIAL_HISTOGRAM ->
// multi values are not supported
cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e)));
case NULL -> {
}
default -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
OBJECT, SCALED_FLOAT, UNSIGNED_LONG, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE;
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX -> TopNEncoder.DEFAULT_UNSORTABLE;
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM ->
TopNEncoder.DEFAULT_UNSORTABLE;
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
// TODO(b/133393) add exponential histogram support
case PARTIAL_AGG, EXPONENTIAL_HISTOGRAM, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
};
}
List<TopNOperator.SortOrder> orders = topNExec.order().stream().map(order -> {
Expand Down