Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
* <br>
* Consumers must ensure that if the histogram is mutated, all previously acquired {@link BucketIterator}
* instances are no longer used.
* <br>
* This implementation is thread-safe for all operations provided via {@link ReleasableExponentialHistogram} and its superclasses,
* as long as it is not mutated concurrently using any of the methods declared in addition in this class
* (e.g. {@link #tryAddBucket(long, long, boolean)}).
*/
final class FixedCapacityExponentialHistogram extends AbstractExponentialHistogram implements ReleasableExponentialHistogram {

Expand Down Expand Up @@ -243,8 +247,10 @@ private class Buckets implements ExponentialHistogram.Buckets {

private final boolean isPositive;
private int numBuckets;
private int cachedValueSumForNumBuckets;
private long cachedValueSum;

private record CachedCountsSum(int numBuckets, long countsSum) {}

private CachedCountsSum cachedCountsSum;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud: this could be volatile or an AtomicReference so that other threads can see the cached value. But this doesn't affect correctness as the worst thing that could happen is that another thread needs to re-compute the value. That's probably not something we're trying to optimize for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that was my though here too: AtomicReference would be the same as volatile here, as we don't do CAS.
However, we don't want to "pay" for volatile here, as it's okay if threads see the cached value too late, because then they will just compute it themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with this one.


/**
* @param isPositive true, if this object should represent the positive bucket range, false for the negative range
Expand All @@ -263,8 +269,7 @@ int startSlot() {

final void reset() {
numBuckets = 0;
cachedValueSumForNumBuckets = 0;
cachedValueSum = 0;
cachedCountsSum = null;
}

boolean tryAddBucket(long index, long count) {
Expand Down Expand Up @@ -297,12 +302,26 @@ public OptionalLong maxBucketIndex() {

@Override
public long valueCount() {
// copy a reference to the field to avoid problems with concurrent updates
CachedCountsSum cachedVal = cachedCountsSum;
if (cachedVal != null && cachedVal.numBuckets == numBuckets) {
return cachedVal.countsSum;
}

long countsSum = 0;
int position = 0;
if (cachedVal != null) {
countsSum = cachedVal.countsSum;
position = cachedVal.numBuckets;
}

int startSlot = startSlot();
while (cachedValueSumForNumBuckets < numBuckets) {
cachedValueSum += bucketCounts[startSlot + cachedValueSumForNumBuckets];
cachedValueSumForNumBuckets++;
while (position < numBuckets) {
countsSum += bucketCounts[startSlot + position];
position++;
}
return cachedValueSum;
this.cachedCountsSum = new CachedCountsSum(position, countsSum);
return countsSum;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,39 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class FixedCapacityExponentialHistogramTests extends ExponentialHistogramTestCase {

public void testConcurrentHashCode() throws ExecutionException, InterruptedException {
List<ExponentialHistogram> originalHistograms = IntStream.range(0, 1000)
.mapToObj(i -> ExponentialHistogramTestUtils.randomHistogram())
.toList();

List<? extends ExponentialHistogram> copies = originalHistograms.stream()
.map(histo -> ExponentialHistogram.builder(histo, ExponentialHistogramCircuitBreaker.noop()).build())
.toList();

// Compute potentially lazy data correctly on the originals
originalHistograms.forEach(Object::hashCode);
concurrentTest(() -> {
for (int i = 0; i < originalHistograms.size(); i++) {
ExponentialHistogram original = originalHistograms.get(i);
ExponentialHistogram copy = copies.get(i);
assertThat(copy.hashCode(), equalTo(original.hashCode()));
}
});
}

public void testValueCountUpdatedCorrectly() {

FixedCapacityExponentialHistogram histogram = FixedCapacityExponentialHistogram.create(100, breaker());
Expand Down Expand Up @@ -69,4 +97,21 @@ public void testMemoryAccounting() {
}
assertThat(esBreaker.getUsed(), equalTo(0L));
}

protected void concurrentTest(Runnable r) throws InterruptedException, ExecutionException {
int threads = 5;
int tasks = threads * 2;
ExecutorService exec = Executors.newFixedThreadPool(threads);
try {
List<Future<?>> results = new ArrayList<>();
for (int t = 0; t < tasks; t++) {
results.add(exec.submit(r));
}
for (Future<?> f : results) {
f.get();
}
} finally {
exec.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1060,13 +1060,6 @@ private static ExponentialHistogram randomExponentialHistogram() {
ExponentialHistogramCircuitBreaker.noop(),
rawValues
);
/*
* hashcode mutates the histogram by doing some lazy work.
* That can change the value of the hashCode if you call it concurrently....
* This works around that by running it once up front. Jonas will have a
* look at this one soon.
*/
histo.hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reverting this bit.

return histo;
}

Expand Down