Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ interface Buckets {
*/
long valueCount();

/**
* Returns the number of buckets. Note that this operation might require iterating over all buckets, and therefore is not cheap.
* @return the number of buckets
*/
default int bucketCount() {
int count = 0;
BucketIterator it = iterator();
while (it.hasNext()) {
count++;
it.advance();
}
return count;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@

package org.elasticsearch.exponentialhistogram;

import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.TreeMap;

/**
* A builder for building a {@link ReleasableExponentialHistogram} directly from buckets.
* Note that this class is not optimized regarding memory allocations or performance, so it is not intended for high-throughput usage.
*/
public class ExponentialHistogramBuilder {
public class ExponentialHistogramBuilder implements Releasable {

private static final int DEFAULT_ESTIMATED_BUCKET_COUNT = 32;

private final ExponentialHistogramCircuitBreaker breaker;

Expand All @@ -39,8 +41,16 @@ public class ExponentialHistogramBuilder {
private Double min;
private Double max;

private final TreeMap<Long, Long> negativeBuckets = new TreeMap<>();
private final TreeMap<Long, Long> positiveBuckets = new TreeMap<>();
private int estimatedBucketCount = DEFAULT_ESTIMATED_BUCKET_COUNT;

// If the buckets are provided in order, we directly build the histogram to avoid unnecessary copies and allocations
// If a bucket is received out of order, we fallback to storing the buckets in the TreeMaps and build the histogram at the end.
private FixedCapacityExponentialHistogram result;
// Visible for testing to ensure that the low-allocation path is taken for ordered buckets
TreeMap<Long, Long> negativeBuckets;
TreeMap<Long, Long> positiveBuckets;

private boolean resultAlreadyReturned = false;

ExponentialHistogramBuilder(int scale, ExponentialHistogramCircuitBreaker breaker) {
this.breaker = breaker;
Expand All @@ -53,6 +63,7 @@ public class ExponentialHistogramBuilder {
sum(toCopy.sum());
min(toCopy.min());
max(toCopy.max());
estimatedBucketCount(toCopy.negativeBuckets().bucketCount() + toCopy.positiveBuckets().bucketCount());
BucketIterator negBuckets = toCopy.negativeBuckets().iterator();
while (negBuckets.hasNext()) {
setNegativeBucket(negBuckets.peekIndex(), negBuckets.peekCount());
Expand All @@ -65,6 +76,19 @@ public class ExponentialHistogramBuilder {
}
}

/**
* If known, sets the estimated total number of buckets to minimize unnecessary allocations.
* Only has an effect if invoked before the first call to
* {@link #setPositiveBucket(long, long)} and {@link #setNegativeBucket(long, long)}.
*
* @param totalBuckets the total number of buckets expected to be added
* @return the builder
*/
public ExponentialHistogramBuilder estimatedBucketCount(int totalBuckets) {
estimatedBucketCount = totalBuckets;
return this;
}

public ExponentialHistogramBuilder scale(int scale) {
this.scale = scale;
return this;
Expand Down Expand Up @@ -106,69 +130,160 @@ public ExponentialHistogramBuilder max(double max) {
}

/**
* Sets the given bucket of the positive buckets.
* Buckets may be set in arbitrary order. If the bucket already exists, it will be replaced.
* Sets the given bucket of the positive buckets. If the bucket already exists, it will be replaced.
* Buckets may be set in arbitrary order. However, for best performance and minimal allocations,
* buckets should be set in order of increasing index and all negative buckets should be set before positive buckets.
*
* @param index the index of the bucket
* @param count the count of the bucket, must be at least 1
* @return the builder
*/
public ExponentialHistogramBuilder setPositiveBucket(long index, long count) {
if (count < 1) {
throw new IllegalArgumentException("Bucket count must be at least 1");
}
positiveBuckets.put(index, count);
setBucket(index, count, true);
return this;
}

/**
* Sets the given bucket of the negative buckets.
* Buckets may be set in arbitrary order. If the bucket already exists, it will be replaced.
* Sets the given bucket of the negative buckets. If the bucket already exists, it will be replaced.
* Buckets may be set in arbitrary order. However, for best performance and minimal allocations,
* buckets should be set in order of increasing index and all negative buckets should be set before positive buckets.
*
* @param index the index of the bucket
* @param count the count of the bucket, must be at least 1
* @return the builder
*/
public ExponentialHistogramBuilder setNegativeBucket(long index, long count) {
setBucket(index, count, false);
return this;
}

private void setBucket(long index, long count, boolean isPositive) {
if (count < 1) {
throw new IllegalArgumentException("Bucket count must be at least 1");
}
negativeBuckets.put(index, count);
return this;
if (negativeBuckets == null && positiveBuckets == null) {
// so far, all received buckets were in order, try to directly build the result
if (result == null) {
// Initialize the result buffer if required
reallocateResultWithCapacity(estimatedBucketCount, false);
}
if ((isPositive && result.wasLastAddedBucketPositive() == false)
|| (isPositive == result.wasLastAddedBucketPositive() && index > result.getLastAddedBucketIndex())) {
// the new bucket is in order too, we can directly add the bucket
addBucketToResult(index, count, isPositive);
return;
}
}
// fallback to TreeMap if a bucket is received out of order
initializeBucketTreeMapsIfNeeded();
if (isPositive) {
positiveBuckets.put(index, count);
} else {
negativeBuckets.put(index, count);
}
}

private void initializeBucketTreeMapsIfNeeded() {
if (negativeBuckets == null) {
negativeBuckets = new TreeMap<>();
positiveBuckets = new TreeMap<>();
// copy existing buckets to the maps
if (result != null) {
BucketIterator it = result.negativeBuckets().iterator();
while (it.hasNext()) {
negativeBuckets.put(it.peekIndex(), it.peekCount());
it.advance();
}
it = result.positiveBuckets().iterator();
while (it.hasNext()) {
positiveBuckets.put(it.peekIndex(), it.peekCount());
it.advance();
}
}
}
}

private void addBucketToResult(long index, long count, boolean isPositive) {
if (resultAlreadyReturned) {
// we cannot modify the result anymore, create a new one
reallocateResultWithCapacity(result.getCapacity(), true);
}
assert resultAlreadyReturned == false;
boolean sufficientCapacity = result.tryAddBucket(index, count, isPositive);
if (sufficientCapacity == false) {
int newCapacity = Math.max(result.getCapacity() * 2, DEFAULT_ESTIMATED_BUCKET_COUNT);
reallocateResultWithCapacity(newCapacity, true);
boolean bucketAdded = result.tryAddBucket(index, count, isPositive);
assert bucketAdded : "Output histogram should have enough capacity";
}
}

private void reallocateResultWithCapacity(int newCapacity, boolean copyBucketsFromPreviousResult) {
FixedCapacityExponentialHistogram newResult = FixedCapacityExponentialHistogram.create(newCapacity, breaker);
if (copyBucketsFromPreviousResult && result != null) {
BucketIterator it = result.negativeBuckets().iterator();
while (it.hasNext()) {
boolean added = newResult.tryAddBucket(it.peekIndex(), it.peekCount(), false);
assert added : "Output histogram should have enough capacity";
it.advance();
}
it = result.positiveBuckets().iterator();
while (it.hasNext()) {
boolean added = newResult.tryAddBucket(it.peekIndex(), it.peekCount(), true);
assert added : "Output histogram should have enough capacity";
it.advance();
}
}
if (result != null && resultAlreadyReturned == false) {
Releasables.close(result);
}
resultAlreadyReturned = false;
result = newResult;
}

public ReleasableExponentialHistogram build() {
FixedCapacityExponentialHistogram result = FixedCapacityExponentialHistogram.create(
negativeBuckets.size() + positiveBuckets.size(),
breaker
);
boolean success = false;
try {
if (resultAlreadyReturned) {
// result was already returned on a previous call, return a new instance
reallocateResultWithCapacity(result.getCapacity(), true);
}
assert resultAlreadyReturned == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

if (negativeBuckets != null) {
// copy buckets from tree maps into result
reallocateResultWithCapacity(negativeBuckets.size() + positiveBuckets.size(), false);
result.resetBuckets(scale);
result.setZeroBucket(zeroBucket);
negativeBuckets.forEach((index, count) -> result.tryAddBucket(index, count, false));
positiveBuckets.forEach((index, count) -> result.tryAddBucket(index, count, true));

double sumVal = (sum != null)
? sum
: ExponentialHistogramUtils.estimateSum(result.negativeBuckets().iterator(), result.positiveBuckets().iterator());
double minVal = (min != null)
? min
: ExponentialHistogramUtils.estimateMin(zeroBucket, result.negativeBuckets(), result.positiveBuckets()).orElse(Double.NaN);
double maxVal = (max != null)
? max
: ExponentialHistogramUtils.estimateMax(zeroBucket, result.negativeBuckets(), result.positiveBuckets()).orElse(Double.NaN);

result.setMin(minVal);
result.setMax(maxVal);
result.setSum(sumVal);

success = true;
} finally {
if (success == false) {
Releasables.close(result);
} else {
if (result == null) {
// no buckets were added
reallocateResultWithCapacity(0, false);
}
result.setScale(scale);
}

result.setZeroBucket(zeroBucket);
double sumVal = (sum != null)
? sum
: ExponentialHistogramUtils.estimateSum(result.negativeBuckets().iterator(), result.positiveBuckets().iterator());
double minVal = (min != null)
? min
: ExponentialHistogramUtils.estimateMin(zeroBucket, result.negativeBuckets(), result.positiveBuckets()).orElse(Double.NaN);
double maxVal = (max != null)
? max
: ExponentialHistogramUtils.estimateMax(zeroBucket, result.negativeBuckets(), result.positiveBuckets()).orElse(Double.NaN);

result.setMin(minVal);
result.setMax(maxVal);
result.setSum(sumVal);

resultAlreadyReturned = true;
return result;
}

@Override
public void close() {
if (resultAlreadyReturned == false) {
Releasables.close(result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ private FixedCapacityExponentialHistogram(int bucketCapacity, ExponentialHistogr
reset();
}

int getCapacity() {
return bucketIndices.length;
}

/**
* Resets this histogram to the same state as a newly constructed one with the same capacity.
*/
Expand All @@ -95,10 +99,9 @@ void reset() {
* @param scale the scale to set for this histogram
*/
void resetBuckets(int scale) {
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ".." + MAX_SCALE + "]";
setScale(scale);
negativeBuckets.reset();
positiveBuckets.reset();
bucketScale = scale;
}

@Override
Expand Down Expand Up @@ -180,6 +183,11 @@ public int scale() {
return bucketScale;
}

void setScale(int scale) {
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ".." + MAX_SCALE + "]";
bucketScale = scale;
}

@Override
public ExponentialHistogram.Buckets negativeBuckets() {
return negativeBuckets;
Expand All @@ -190,6 +198,25 @@ public ExponentialHistogram.Buckets positiveBuckets() {
return positiveBuckets;
}

/**
* @return the index of the last bucket added successfully via {@link #tryAddBucket(long, long, boolean)},
* or {@link Long#MIN_VALUE} if no buckets have been added yet.
*/
long getLastAddedBucketIndex() {
if (positiveBuckets.numBuckets + negativeBuckets.numBuckets > 0) {
return bucketIndices[negativeBuckets.numBuckets + positiveBuckets.numBuckets - 1];
} else {
return Long.MIN_VALUE;
}
}

/**
* @return true, if the last bucket added successfully via {@link #tryAddBucket(long, long, boolean)} was a positive one.
*/
boolean wasLastAddedBucketPositive() {
return positiveBuckets.numBuckets > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How so, can't we mix adding positive and negative buckets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a single array for storage, where we have all buckets for negative values followed by the buckets for positive values. Therefore no, you can't add a negative bucket after a positive one in FixedCapacityExponentialHistogram, this invariant is already enforced by tryAddBucket.

}

@Override
public void close() {
if (closed) {
Expand Down Expand Up @@ -276,6 +303,11 @@ public long valueCount() {
}
return cachedValueSum;
}

@Override
public int bucketCount() {
return numBuckets;
}
}

}
Loading