Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,25 @@ public ExponentialHistogram get() {
return (result == null) ? ExponentialHistogram.empty() : result;
}

/**
* Merges the given histogram into the current result, not upscaling it.
* This should be used when merging intermediate results to prevent accumulating errors.
*
* @param toAdd the histogram to merge
*/
public void addWithoutUpscaling(ExponentialHistogram toAdd) {
add(toAdd, false);
}

/**
* Merges the given histogram into the current result. The histogram might be upscaled if needed.
*
* @param toAdd the histogram to merge
*/
public void add(ExponentialHistogram toAdd) {
add(toAdd, true);
}

// This algorithm is very efficient if B has roughly as many buckets as A.
// However, if B is much smaller we still have to iterate over all buckets of A.
// This can be optimized by buffering the buckets of small histograms and only merging them when we have enough buckets.
Expand All @@ -165,12 +184,7 @@ public ExponentialHistogram get() {
// It would be possible to only enable the buffering for small histograms,
// but the optimization seems not worth the added complexity at this point.

/**
* Merges the given histogram into the current result.
*
* @param toAdd the histogram to merge
*/
public void add(ExponentialHistogram toAdd) {
private void add(ExponentialHistogram toAdd, boolean allowUpscaling) {
ExponentialHistogram a = result == null ? ExponentialHistogram.empty() : result;
ExponentialHistogram b = toAdd;

Expand All @@ -193,6 +207,10 @@ public void add(ExponentialHistogram toAdd) {
// This might involve increasing the scale for B, which would increase its indices.
// We need to ensure that we do not exceed MAX_INDEX / MIN_INDEX in this case.
int targetScale = Math.min(maxScale, a.scale());
if (allowUpscaling == false) {
targetScale = Math.min(targetScale, b.scale());
}

if (targetScale > b.scale()) {
if (negBucketsB.hasNext()) {
long smallestIndex = negBucketsB.peekIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ public void testEmptyZeroBucketIgnored() {
assertThat(posBuckets.hasNext(), equalTo(false));
}

public void testMergeWithoutUpscaling() {
ExponentialHistogram histo = createAutoReleasedHistogram(b -> b.scale(0).setPositiveBucket(2, 42));
try (ExponentialHistogramMerger merger = ExponentialHistogramMerger.create(100, breaker())) {
merger.addWithoutUpscaling(histo);
assertThat(merger.get(), equalTo(histo));
}
}

public void testAggregatesCorrectness() {
double[] firstValues = randomDoubles(100).map(val -> val * 2 - 1).toArray();
double[] secondValues = randomDoubles(50).map(val -> val * 2 - 1).toArray();
Expand Down