Skip to content

Commit f85f59d

Browse files
authored
Add sum to exponential histograms (#133381)
1 parent 3271c5a commit f85f59d

File tree

17 files changed

+491
-45
lines changed

17 files changed

+491
-45
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogra
130130
CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets);
131131
CompressedExponentialHistogram result = new CompressedExponentialHistogram();
132132
BytesRef data = histoBytes.bytes().toBytesRef();
133-
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, data);
133+
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, histogram.sum(), data);
134134
return result;
135135
} catch (IOException e) {
136136
throw new RuntimeException(e);

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/EmptyExponentialHistogram.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public Buckets negativeBuckets() {
7777
return EmptyBuckets.INSTANCE;
7878
}
7979

80+
@Override
81+
public double sum() {
82+
return 0;
83+
}
84+
8085
@Override
8186
public long ramBytesUsed() {
8287
return 0;

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogram.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
public interface ExponentialHistogram extends Accountable {
4949

50-
// TODO(b/128622): support min/max/sum/count storage and merging.
50+
// TODO(b/128622): support min/max storage and merging.
5151
// TODO(b/128622): Add special positive and negative infinity buckets
5252
// to allow representation of explicit bucket histograms with open boundaries.
5353

@@ -93,6 +93,15 @@ public interface ExponentialHistogram extends Accountable {
9393
*/
9494
Buckets negativeBuckets();
9595

96+
/**
97+
* Returns the sum of all values represented by this histogram.
98+
* Note that even if histograms are cumulative, the sum is not guaranteed to be monotonically increasing,
99+
* because histograms support negative values.
100+
*
101+
* @return the sum, guaranteed to be zero for empty histograms
102+
*/
103+
double sum();
104+
96105
/**
97106
* Represents a bucket range of an {@link ExponentialHistogram}, either the positive or the negative range.
98107
*/

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramGenerator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private void mergeValuesToHistogram() {
123123
}
124124

125125
valueBuffer.reset();
126+
valueBuffer.setSum(rawValuesSum());
126127
int scale = valueBuffer.scale();
127128

128129
// Buckets must be provided with their indices in ascending order.
@@ -161,6 +162,14 @@ private void mergeValuesToHistogram() {
161162
valueCount = 0;
162163
}
163164

165+
private double rawValuesSum() {
166+
double sum = 0;
167+
for (int i = 0; i < valueCount; i++) {
168+
sum += rawValueBuffer[i];
169+
}
170+
return sum;
171+
}
172+
164173
private static long estimateBaseSize(int numBuckets) {
165174
return SHALLOW_SIZE + RamEstimationUtil.estimateDoubleArray(numBuckets);
166175
};

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMerger.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void add(ExponentialHistogram toAdd) {
150150
buffer = FixedCapacityExponentialHistogram.create(bucketLimit, circuitBreaker);
151151
}
152152
buffer.setZeroBucket(zeroBucket);
153+
buffer.setSum(a.sum() + b.sum());
153154

154155
// We attempt to bring everything to the scale of A.
155156
// This might involve increasing the scale for B, which would increase its indices.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V., and/or licensed to Elasticsearch B.V.
3+
* under one or more license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* This file is based on a modification of https://github.com/open-telemetry/opentelemetry-java which is licensed under the Apache 2.0 License.
20+
*/
21+
22+
package org.elasticsearch.exponentialhistogram;
23+
24+
public class ExponentialHistogramUtils {
25+
26+
/**
27+
* Estimates the sum of all values of a histogram just based on the populated buckets.
28+
* Will never return NaN, but might return +/-Infinity if the histogram is too big.
29+
*
30+
* @param negativeBuckets the negative buckets of the histogram
31+
* @param positiveBuckets the positive buckets of the histogram
32+
* @return the estimated sum of all values in the histogram, guaranteed to be zero if there are no buckets.
33+
*/
34+
public static double estimateSum(BucketIterator negativeBuckets, BucketIterator positiveBuckets) {
35+
assert negativeBuckets.scale() == positiveBuckets.scale();
36+
37+
// for each bucket index, sum up the counts, but account for the positive/negative sign
38+
BucketIterator it = new MergingBucketIterator(
39+
positiveBuckets,
40+
negativeBuckets,
41+
positiveBuckets.scale(),
42+
(positiveCount, negativeCount) -> positiveCount - negativeCount
43+
);
44+
double sum = 0.0;
45+
while (it.hasNext()) {
46+
long countWithSign = it.peekCount();
47+
double bucketMidPoint = ExponentialScaleUtils.getPointOfLeastRelativeError(it.peekIndex(), it.scale());
48+
if (countWithSign != 0) { // avoid 0 * INFINITY = NaN
49+
double toAdd = bucketMidPoint * countWithSign;
50+
if (Double.isFinite(toAdd)) {
51+
sum += toAdd;
52+
} else {
53+
// Avoid NaN in case we end up with e.g. -Infinity+Infinity
54+
// we consider the bucket with the bigger index the winner for the sign
55+
sum = toAdd;
56+
}
57+
}
58+
it.advance();
59+
}
60+
return sum;
61+
}
62+
}

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public class ExponentialHistogramXContent {
3232

3333
public static final String SCALE_FIELD = "scale";
34+
public static final String SUM_FIELD = "sum";
3435
public static final String ZERO_FIELD = "zero";
3536
public static final String ZERO_COUNT_FIELD = "count";
3637
public static final String ZERO_THRESHOLD_FIELD = "threshold";
@@ -49,6 +50,7 @@ public static void serialize(XContentBuilder builder, ExponentialHistogram histo
4950
builder.startObject();
5051

5152
builder.field(SCALE_FIELD, histogram.scale());
53+
builder.field(SUM_FIELD, histogram.sum());
5254
double zeroThreshold = histogram.zeroBucket().zeroThreshold();
5355
long zeroCount = histogram.zeroBucket().count();
5456

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogram.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ final class FixedCapacityExponentialHistogram implements ReleasableExponentialHi
5353

5454
private final Buckets positiveBuckets = new Buckets(true);
5555

56+
private double sum;
57+
5658
private final ExponentialHistogramCircuitBreaker circuitBreaker;
5759
private boolean closed = false;
5860

@@ -78,6 +80,7 @@ private FixedCapacityExponentialHistogram(int bucketCapacity, ExponentialHistogr
7880
* Resets this histogram to the same state as a newly constructed one with the same capacity.
7981
*/
8082
void reset() {
83+
sum = 0;
8184
setZeroBucket(ZeroBucket.minimalEmpty());
8285
resetBuckets(MAX_SCALE);
8386
}
@@ -110,6 +113,15 @@ void setZeroBucket(ZeroBucket zeroBucket) {
110113
this.zeroBucket = zeroBucket;
111114
}
112115

116+
@Override
117+
public double sum() {
118+
return sum;
119+
}
120+
121+
void setSum(double sum) {
122+
this.sum = sum;
123+
}
124+
113125
/**
114126
* Attempts to add a bucket to the positive or negative range of this histogram.
115127
* <br>

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/MergingBucketIterator.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
package org.elasticsearch.exponentialhistogram;
2323

24+
import java.util.function.LongBinaryOperator;
25+
2426
/**
2527
* An iterator that merges two bucket iterators, aligning them to a common scale and combining buckets with the same index.
2628
*/
@@ -33,6 +35,8 @@ final class MergingBucketIterator implements BucketIterator {
3335
private long currentIndex;
3436
private long currentCount;
3537

38+
private final LongBinaryOperator countMergeOperator;
39+
3640
/**
3741
* Creates a new merging iterator.
3842
*
@@ -41,8 +45,22 @@ final class MergingBucketIterator implements BucketIterator {
4145
* @param targetScale the histogram scale to which both iterators should be aligned
4246
*/
4347
MergingBucketIterator(BucketIterator itA, BucketIterator itB, int targetScale) {
48+
this(itA, itB, targetScale, Long::sum);
49+
}
50+
51+
/**
52+
* Creates a new merging iterator, using the provided operator to merge the counts.
53+
* Note that the resulting count can be negative if the operator produces negative results.
54+
*
55+
* @param itA the first iterator to merge
56+
* @param itB the second iterator to merge
57+
* @param countMergeOperator the operator to use to merge counts of buckets with the same index
58+
* @param targetScale the histogram scale to which both iterators should be aligned
59+
*/
60+
MergingBucketIterator(BucketIterator itA, BucketIterator itB, int targetScale, LongBinaryOperator countMergeOperator) {
4461
this.itA = new ScaleAdjustingBucketIterator(itA, targetScale);
4562
this.itB = new ScaleAdjustingBucketIterator(itB, targetScale);
63+
this.countMergeOperator = countMergeOperator;
4664
endReached = false;
4765
advance();
4866
}
@@ -64,19 +82,21 @@ public void advance() {
6482
idxB = itB.peekIndex();
6583
}
6684

67-
currentCount = 0;
6885
boolean advanceA = hasNextA && (hasNextB == false || idxA <= idxB);
6986
boolean advanceB = hasNextB && (hasNextA == false || idxB <= idxA);
87+
long countA = 0;
88+
long countB = 0;
7089
if (advanceA) {
7190
currentIndex = idxA;
72-
currentCount += itA.peekCount();
91+
countA = itA.peekCount();
7392
itA.advance();
7493
}
7594
if (advanceB) {
7695
currentIndex = idxB;
77-
currentCount += itB.peekCount();
96+
countB = itB.peekCount();
7897
itB.advance();
7998
}
99+
currentCount = countMergeOperator.applyAsLong(countA, countB);
80100
}
81101

82102
@Override
@@ -106,4 +126,5 @@ private void assertEndNotReached() {
106126
throw new IllegalStateException("Iterator has no more buckets");
107127
}
108128
}
129+
109130
}

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,22 @@ public void testEmptyZeroBucketIgnored() {
106106
assertThat(posBuckets.hasNext(), equalTo(false));
107107
}
108108

109+
public void testSumCorrectness() {
110+
double[] firstValues = randomDoubles(100).map(val -> val * 2 - 1).toArray();
111+
double[] secondValues = randomDoubles(50).map(val -> val * 2 - 1).toArray();
112+
double correctSum = Arrays.stream(firstValues).sum() + Arrays.stream(secondValues).sum();
113+
try (
114+
ReleasableExponentialHistogram merged = ExponentialHistogram.merge(
115+
2,
116+
breaker(),
117+
createAutoReleasedHistogram(10, firstValues),
118+
createAutoReleasedHistogram(20, secondValues)
119+
)
120+
) {
121+
assertThat(merged.sum(), closeTo(correctSum, 0.000001));
122+
}
123+
}
124+
109125
public void testUpscalingDoesNotExceedIndexLimits() {
110126
for (int i = 0; i < 4; i++) {
111127

0 commit comments

Comments
 (0)