Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogra
CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets);
CompressedExponentialHistogram result = new CompressedExponentialHistogram();
BytesRef data = histoBytes.bytes().toBytesRef();
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, data);
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, histogram.sum(), data);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public Buckets negativeBuckets() {
return EmptyBuckets.INSTANCE;
}

@Override
public double sum() {
return 0;
}

@Override
public long ramBytesUsed() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
public interface ExponentialHistogram extends Accountable {

// TODO(b/128622): support min/max/sum/count storage and merging.
// TODO(b/128622): support min/max storage and merging.
// TODO(b/128622): Add special positive and negative infinity buckets
// to allow representation of explicit bucket histograms with open boundaries.

Expand Down Expand Up @@ -93,6 +93,15 @@ public interface ExponentialHistogram extends Accountable {
*/
Buckets negativeBuckets();

/**
* Returns the sum of all values represented by this histogram.
* Note that even if histograms are cumulative, the sum is not guaranteed to be monotonically increasing,
* because histograms support negative values.
*
* @return the sum, guaranteed to be zero for empty histograms
*/
double sum();

/**
* Represents a bucket range of an {@link ExponentialHistogram}, either the positive or the negative range.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private void mergeValuesToHistogram() {
}

valueBuffer.reset();
valueBuffer.setSum(rawValuesSum());
int scale = valueBuffer.scale();

// Buckets must be provided with their indices in ascending order.
Expand Down Expand Up @@ -161,6 +162,14 @@ private void mergeValuesToHistogram() {
valueCount = 0;
}

private double rawValuesSum() {
double sum = 0;
for (int i = 0; i < valueCount; i++) {
sum += rawValueBuffer[i];
}
return sum;
}

private static long estimateBaseSize(int numBuckets) {
return SHALLOW_SIZE + RamEstimationUtil.estimateDoubleArray(numBuckets);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void add(ExponentialHistogram toAdd) {
buffer = FixedCapacityExponentialHistogram.create(bucketLimit, circuitBreaker);
}
buffer.setZeroBucket(zeroBucket);
buffer.setSum(a.sum() + b.sum());

// We attempt to bring everything to the scale of A.
// This might involve increasing the scale for B, which would increase its indices.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V., and/or licensed to Elasticsearch B.V.
* under one or more license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* This file is based on a modification of https://github.com/open-telemetry/opentelemetry-java which is licensed under the Apache 2.0 License.
*/

package org.elasticsearch.exponentialhistogram;

public class ExponentialHistogramUtils {

/**
* Estimates the sum of all values of a histogram just based on the populated buckets.
* Will never return NaN, but might return +/-Infinity if the histogram is too big.
*
* @param negativeBuckets the negative buckets of the histogram
* @param positiveBuckets the positive buckets of the histogram
* @return the estimated sum of all values in the histogram, guaranteed to be zero if there are no buckets.
*/
public static double estimateSum(BucketIterator negativeBuckets, BucketIterator positiveBuckets) {
assert negativeBuckets.scale() == positiveBuckets.scale();

// for each bucket index, sum up the counts, but account for the positive/negative sign
BucketIterator it = new MergingBucketIterator(negativeBuckets, -1, positiveBuckets, 1, positiveBuckets.scale());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very excited about the negative counts - what are the semantics? I'd rather we have a utility function that's called on each iterator that internally multiplies the sum with -1 for negative buckets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 5f94454 I've replaced the "factors" with the ability to provide a custom operator to do the count merging.
Is that what you were thinking of?

double sum = 0.0;
while (it.hasNext()) {
long countWithSign = it.peekCount();
double bucketMidPoint = ExponentialScaleUtils.getPointOfLeastRelativeError(it.peekIndex(), it.scale());
if (countWithSign != 0) { // avoid 0 * INFINITY = NaN
double toAdd = bucketMidPoint * countWithSign;
if (Double.isFinite(toAdd)) {
sum += toAdd;
} else {
// Avoid NaN in case we end up with e.g. -Infinity+Infinity
// we consider the bucket with the bigger index the winner for the sign
sum = toAdd;
}
}
it.advance();
}
return sum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class ExponentialHistogramXContent {

public static final String SCALE_FIELD = "scale";
public static final String SUM_FIELD = "sum";
public static final String ZERO_FIELD = "zero";
public static final String ZERO_COUNT_FIELD = "count";
public static final String ZERO_THRESHOLD_FIELD = "threshold";
Expand All @@ -49,6 +50,7 @@ public static void serialize(XContentBuilder builder, ExponentialHistogram histo
builder.startObject();

builder.field(SCALE_FIELD, histogram.scale());
builder.field(SUM_FIELD, histogram.sum());
double zeroThreshold = histogram.zeroBucket().zeroThreshold();
long zeroCount = histogram.zeroBucket().count();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ final class FixedCapacityExponentialHistogram implements ReleasableExponentialHi

private final Buckets positiveBuckets = new Buckets(true);

private double sum;

private final ExponentialHistogramCircuitBreaker circuitBreaker;
private boolean closed = false;

Expand All @@ -78,6 +80,7 @@ private FixedCapacityExponentialHistogram(int bucketCapacity, ExponentialHistogr
* Resets this histogram to the same state as a newly constructed one with the same capacity.
*/
void reset() {
sum = 0;
setZeroBucket(ZeroBucket.minimalEmpty());
resetBuckets(MAX_SCALE);
}
Expand Down Expand Up @@ -110,6 +113,15 @@ void setZeroBucket(ZeroBucket zeroBucket) {
this.zeroBucket = zeroBucket;
}

@Override
public double sum() {
return sum;
}

void setSum(double sum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only exposed for testing? If so, let's add a comment to call it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we try to avoid recalculating in merging. Sounds good - I don't know how I feel about not validating the passed value but it can be expensive and tricky to do once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be sufficient for us to do the validations required on ingestion and trust the values to be sane internally.
Also we don't just avoid recalculating while merging for performance reasons: The calculation we have is just an estimation. User can instead provide the exact sum on ingestion, which means we'll preserve exactness when merging, giving exact averages. In OTLP, the sum is provided by default.

this.sum = sum;
}

/**
* Attempts to add a bucket to the positive or negative range of this histogram.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ final class MergingBucketIterator implements BucketIterator {
private long currentIndex;
private long currentCount;

private final long factorA;
private final long factorB;

/**
* Creates a new merging iterator.
*
Expand All @@ -41,8 +44,24 @@ final class MergingBucketIterator implements BucketIterator {
* @param targetScale the histogram scale to which both iterators should be aligned
*/
MergingBucketIterator(BucketIterator itA, BucketIterator itB, int targetScale) {
this(itA, 1, itB, 1, targetScale);
}

/**
* Creates a new merging iterator, multiplying counts from each iterator with the provided factors.
* Note that the factors can be negative, in which case {@link #peekCount()} can return zero or negative values.
*
* @param itA the first iterator to merge
* @param factorA a factor to multiply counts from the first iterator with
* @param itB the second iterator to merge
* @param factorB a factor to multiply counts from the second iterator with
* @param targetScale the histogram scale to which both iterators should be aligned
*/
MergingBucketIterator(BucketIterator itA, long factorA, BucketIterator itB, long factorB, int targetScale) {
this.itA = new ScaleAdjustingBucketIterator(itA, targetScale);
this.itB = new ScaleAdjustingBucketIterator(itB, targetScale);
this.factorA = factorA;
this.factorB = factorB;
endReached = false;
advance();
}
Expand All @@ -69,12 +88,12 @@ public void advance() {
boolean advanceB = hasNextB && (hasNextA == false || idxB <= idxA);
if (advanceA) {
currentIndex = idxA;
currentCount += itA.peekCount();
currentCount += itA.peekCount() * factorA;
itA.advance();
}
if (advanceB) {
currentIndex = idxB;
currentCount += itB.peekCount();
currentCount += itB.peekCount() * factorB;
itB.advance();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ public void testEmptyZeroBucketIgnored() {
assertThat(posBuckets.hasNext(), equalTo(false));
}

public void testSumCorrectness() {
double[] firstValues = randomDoubles(100).map(val -> val * 2 - 1).toArray();
double[] secondValues = randomDoubles(50).map(val -> val * 2 - 1).toArray();
double correctSum = Arrays.stream(firstValues).sum() + Arrays.stream(secondValues).sum();
try (
ReleasableExponentialHistogram merged = ExponentialHistogram.merge(
2,
breaker(),
createAutoReleasedHistogram(10, firstValues),
createAutoReleasedHistogram(20, secondValues)
)
) {
assertThat(merged.sum(), closeTo(correctSum, 0.000001));
}
}

public void testUpscalingDoesNotExceedIndexLimits() {
for (int i = 0; i < 4; i++) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V., and/or licensed to Elasticsearch B.V.
* under one or more license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* This file is based on a modification of https://github.com/open-telemetry/opentelemetry-java which is licensed under the Apache 2.0 License.
*/

package org.elasticsearch.exponentialhistogram;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;

public class ExponentialHistogramUtilsTests extends ExponentialHistogramTestCase {

public void testRandomDataSumEstimation() {
for (int i = 0; i < 100; i++) {
int valueCount = randomIntBetween(100, 10_000);
int bucketCount = randomIntBetween(2, 500);

double correctSum = 0;
double sign = randomBoolean() ? 1 : -1;
double[] values = new double[valueCount];
for (int j = 0; j < valueCount; j++) {
values[j] = sign * Math.pow(10, randomIntBetween(1, 9)) * randomDouble();
correctSum += values[j];
}

ExponentialHistogram histo = createAutoReleasedHistogram(bucketCount, values);

double estimatedSum = ExponentialHistogramUtils.estimateSum(
histo.negativeBuckets().iterator(),
histo.positiveBuckets().iterator()
);

double correctAverage = correctSum / valueCount;
double estimatedAverage = estimatedSum / valueCount;

// If the histogram does not contain mixed sign values, we have a guaranteed relative error bound of 2^(2^-scale) - 1
double histogramBase = Math.pow(2, Math.pow(2, -histo.scale()));
double allowedError = Math.abs(correctAverage * (histogramBase - 1));

assertThat(estimatedAverage, closeTo(correctAverage, allowedError));
}
}

public void testInfinityHandling() {
FixedCapacityExponentialHistogram morePositiveValues = createAutoReleasedHistogram(100);
morePositiveValues.resetBuckets(0);
morePositiveValues.tryAddBucket(1999, 1, false);
morePositiveValues.tryAddBucket(2000, 2, false);
morePositiveValues.tryAddBucket(1999, 2, true);
morePositiveValues.tryAddBucket(2000, 2, true);

double sum = ExponentialHistogramUtils.estimateSum(
morePositiveValues.negativeBuckets().iterator(),
morePositiveValues.positiveBuckets().iterator()
);
assertThat(sum, equalTo(Double.POSITIVE_INFINITY));
FixedCapacityExponentialHistogram moreNegativeValues = createAutoReleasedHistogram(100);
moreNegativeValues.resetBuckets(0);
moreNegativeValues.tryAddBucket(1999, 2, false);
moreNegativeValues.tryAddBucket(2000, 2, false);
moreNegativeValues.tryAddBucket(1999, 1, true);
moreNegativeValues.tryAddBucket(2000, 2, true);

sum = ExponentialHistogramUtils.estimateSum(
moreNegativeValues.negativeBuckets().iterator(),
moreNegativeValues.positiveBuckets().iterator()
);
assertThat(sum, equalTo(Double.NEGATIVE_INFINITY));
}
}
Loading