Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
throw new UnsupportedOperationException("null block");
}

@Override
public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) {
assert false : "null block";
throw new UnsupportedOperationException("null block");
}

@Override
public int getTotalValueCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,34 @@

final class ExponentialHistogramArrayBlock extends AbstractNonThreadSafeRefCounted implements ExponentialHistogramBlock {

// Exponential histograms consist of several components that we store in separate blocks
// due to (a) better compression in the field mapper for disk storage and (b) faster computations if only one sub-component is needed
// What are the semantics of positions, multi-value counts and nulls in the exponential histogram block and
// how do they relate to the sub-blocks?
// ExponentialHistogramBlock need to adhere to the contract of Blocks for the access patterns:
//
// for (int position = 0; position < block.getPositionCount(); position++) {
// ...int valueCount = block.getValueCount(position);
// ...for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
// ......ExponentialHistogram histo = block.getExponentialHistogram(valueIndex, scratch);
// ...}
// }
//
// That implies that given only a value-index, we need to be able to retrieve all components of the histogram.
// Because we can't make any assumptions on how value indices are laid out in the sub-blocks for multi-values,
// we enforce that the sub-blocks have at most one value per position (i.e., no multi-values).
// Based on this, we can define the valueIndex for ExponentialHistogramArrayBlock to correspond to positions in the sub-blocks.
// So basically the sub-blocks are the "flattened" components of the histograms.
// If we later add multi-value support to ExponentialHistogramArrayBlock,
// we can't use the multi-value support of the sub-blocks to implement that.
// Instead, we need to maintain a firstValueIndex array ourselves in ExponentialHistogramArrayBlock.

private final DoubleBlock minima;
private final DoubleBlock maxima;
private final DoubleBlock sums;
/**
Holds the number of values in each histogram. Note that this is a different concept from getValueCount(position)!
*/
private final LongBlock valueCounts;
private final DoubleBlock zeroThresholds;
private final BytesRefBlock encodedHistograms;
Expand Down Expand Up @@ -94,6 +119,22 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
}
}

@Override
public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueIndex should be position instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intended usage pattern for serializeExponentialHistogram is just like for getters on blocks:

for (int i=0; i<block.getPositionCount(); i++) {
  for (int j = 0; j < block.getValueCount(i); j++) {
    block.serializeExponentialHistogram(block.getFirstValueIndex(i) + j, ...)
  }
}

So valueIndex instead of position is correct here.
Right now it is true for exponential histogram blocks that we just use the positions directly as valueIndex, but that will change when we support multi-values.

See also this comment: #133393 (comment)

I'll add a comment in ExponentialHistogramArrayBlock explaining the mapping of positions and valueIndices to positions in the sub-blocks.

// not that this value count is different from getValueCount(position)!
// this value count represents the number of individual samples the histogram was computed for
long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex));
out.appendLong(valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)) -> valueCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are confusing this with the value count returned via getValueCount(position). This is a different value-count: It is the number of samples the histogram was generated from. I've added a comment to avoid this confusion.

out.appendDouble(sums.getDouble(sums.getFirstValueIndex(valueIndex)));
out.appendDouble(zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex)));
if (valueCount > 0) {
// min / max are only non-null for non-empty histograms
out.appendDouble(minima.getDouble(minima.getFirstValueIndex(valueIndex)));
out.appendDouble(maxima.getDouble(maxima.getFirstValueIndex(valueIndex)));
}
out.appendBytesRef(encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), scratch));
}

@Override
protected void closeInternal() {
Releasables.close(getSubBlocks());
Expand Down Expand Up @@ -376,5 +417,4 @@ public int hashCode() {
// this ensures proper equality with null blocks and should be unique enough for practical purposes
return encodedHistograms.hashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;

/**
Expand All @@ -27,6 +28,16 @@ public sealed interface ExponentialHistogramBlock extends Block permits Constant
*/
ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialHistogramScratch scratch);

/**
* Serializes the exponential histogram at the given index into the provided output, so that it can be read back
* via {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
*
* @param valueIndex
* @param out
* @param scratch
*/
void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valueIndex -> position

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBlock blockB) {
if (blockA == blockB) {
return true;
Expand All @@ -42,4 +53,27 @@ static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBloc
};
}

/**
* Abstraction to use for writing individual values via {@link #serializeExponentialHistogram(int, SerializedOutput, BytesRef)}.
*/
interface SerializedOutput {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to have these interfaces, but they're okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't want to expose the zero-threshold from the block to prevent it becoming a maintenance nightmare if we do changes to the disk format. That's why I want to keep the knowledge of its existence local to the block implementation.

I though pulling in a direct dependency on the TopNEncoder would not be a good idea, that's why I added these interfaces in between. If you prefer it, I can directly use the TopNEncoder in serializeExponentialHistogram instead in a follow-up

void appendDouble(double value);

void appendLong(long value);

void appendBytesRef(BytesRef bytesRef);
}

/**
* Abstraction to use for reading individual serialized via
* {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
*/
interface SerializedInput {
double readDouble();

long readLong();

BytesRef readBytesRef(BytesRef scratch);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
Expand All @@ -25,6 +26,8 @@ public class ExponentialHistogramBlockBuilder implements Block.Builder, BlockLoa
private final DoubleBlock.Builder zeroThresholdsBuilder;
private final BytesRefBlock.Builder encodedHistogramsBuilder;

private final BytesRef scratch = new BytesRef();

ExponentialHistogramBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
DoubleBlock.Builder minimaBuilder = null;
DoubleBlock.Builder maximaBuilder = null;
Expand Down Expand Up @@ -131,6 +134,27 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) {
return this;
}

/**
* Decodes and appends a value serialized with
* {@link ExponentialHistogramBlock#serializeExponentialHistogram(int, ExponentialHistogramBlock.SerializedOutput, BytesRef)}.
*
* @param input the input to deserialize from
*/
public void deserializeAndAppend(ExponentialHistogramBlock.SerializedInput input) {
long valueCount = input.readLong();
valueCountsBuilder.appendLong(valueCount);
sumsBuilder.appendDouble(input.readDouble());
zeroThresholdsBuilder.appendDouble(input.readDouble());
if (valueCount > 0) {
minimaBuilder.appendDouble(input.readDouble());
maximaBuilder.appendDouble(input.readDouble());
} else {
minimaBuilder.appendNull();
maximaBuilder.appendNull();
}
encodedHistogramsBuilder.appendBytesRef(input.readBytesRef(scratch));
}

@Override
public ExponentialHistogramBlock build() {
DoubleBlock minima = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static ResultBuilder resultBuilderFor(
case NULL -> new ResultBuilderForNull(blockFactory);
case DOC -> new ResultBuilderForDoc(blockFactory, (DocVectorEncoder) encoder, positions);
case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions);
case EXPONENTIAL_HISTOGRAM -> new ResultBuilderForExponentialHistogram(blockFactory, positions);
default -> {
assert false : "Result builder for [" + elementType + "]";
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder;

public class ResultBuilderForExponentialHistogram implements ResultBuilder {

private final ExponentialHistogramBlockBuilder builder;
private final ReusableTopNEncoderInput reusableInput = new ReusableTopNEncoderInput();

ResultBuilderForExponentialHistogram(BlockFactory blockFactory, int positions) {
this.builder = blockFactory.newExponentialHistogramBlockBuilder(positions);
}

@Override
public void decodeKey(BytesRef keys) {
throw new AssertionError("ExponentialHistogramBlock can't be a key");
}

@Override
public void decodeValue(BytesRef values) {
int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values);
if (count == 0) {
builder.appendNull();
return;
}
assert count == 1 : "ExponentialHistogramBlock does not support multi values";
reusableInput.inputValues = values;
builder.deserializeAndAppend(reusableInput);
}

@Override
public Block build() {
return builder.build();
}

@Override
public String toString() {
return "ResultBuilderForExponentialHistogram";
}

@Override
public void close() {
builder.close();
}

private static final class ReusableTopNEncoderInput implements ExponentialHistogramBlock.SerializedInput {
BytesRef inputValues;

@Override
public double readDouble() {
return TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(inputValues);
}

@Override
public long readLong() {
return TopNEncoder.DEFAULT_UNSORTABLE.decodeLong(inputValues);
}

@Override
public BytesRef readBytesRef(BytesRef scratch) {
return TopNEncoder.DEFAULT_UNSORTABLE.decodeBytesRef(inputValues, scratch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
Expand Down Expand Up @@ -53,6 +54,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
case NULL -> new ValueExtractorForNull();
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block);
case EXPONENTIAL_HISTOGRAM -> new ValueExtractorForExponentialHistogram(encoder, (ExponentialHistogramBlock) block);
default -> {
assert false : "No value extractor for [" + block.elementType() + "]";
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;

public class ValueExtractorForExponentialHistogram implements ValueExtractor {
private final ExponentialHistogramBlock block;

private final BytesRef scratch = new BytesRef();
private final ReusableTopNEncoderOutput reusableOutput = new ReusableTopNEncoderOutput();

ValueExtractorForExponentialHistogram(TopNEncoder encoder, ExponentialHistogramBlock block) {
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
this.block = block;
}

@Override
public void writeValue(BreakingBytesRefBuilder values, int position) {
// number of multi-values first for compatibility with ValueExtractorForNull
if (block.isNull(position)) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(0, values);
} else {
assert block.getValueCount(position) == 1 : "Multi-valued ExponentialHistogram blocks are not supported in TopN";
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values);
int valueIndex = block.getFirstValueIndex(position);
reusableOutput.target = values;
block.serializeExponentialHistogram(valueIndex, reusableOutput, scratch);
}
}

@Override
public String toString() {
return "ValueExtractorForExponentialHistogram";
}

private static final class ReusableTopNEncoderOutput implements ExponentialHistogramBlock.SerializedOutput {
BreakingBytesRefBuilder target;

@Override
public void appendDouble(double value) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, target);
}

@Override
public void appendLong(long value) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeLong(value, target);
}

@Override
public void appendBytesRef(BytesRef value) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeBytesRef(value, target);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static Iterable<Object[]> parameters() {
case UNKNOWN -> {
supportsNull = false;
}
case COMPOSITE, EXPONENTIAL_HISTOGRAM -> {
case COMPOSITE -> {
// TODO: add later
supportsNull = false;
}
Expand Down Expand Up @@ -130,6 +130,9 @@ public static Iterable<Object[]> parameters() {
) }
);
}
case EXPONENTIAL_HISTOGRAM ->
// multi values are not supported
cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e)));
case NULL -> {
}
default -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
OBJECT, SCALED_FLOAT, UNSIGNED_LONG, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE;
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX -> TopNEncoder.DEFAULT_UNSORTABLE;
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM ->
TopNEncoder.DEFAULT_UNSORTABLE;
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
// TODO(b/133393) add exponential histogram support
case PARTIAL_AGG, EXPONENTIAL_HISTOGRAM, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
};
}
List<TopNOperator.SortOrder> orders = topNExec.order().stream().map(order -> {
Expand Down