Skip to content

Commit 70ba2af

Browse files
authored
Fix thread-safety of FixedCapacityExponentialHistogram.valueCount (elastic#137357)
1 parent 1caa6e3 commit 70ba2af

File tree

3 files changed

+72
-15
lines changed

3 files changed

+72
-15
lines changed

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogram.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
* <br>
3131
* Consumers must ensure that if the histogram is mutated, all previously acquired {@link BucketIterator}
3232
* instances are no longer used.
33+
* <br>
34+
* This implementation is thread-safe for all operations provided via {@link ReleasableExponentialHistogram} and its superclasses,
35+
* as long as it is not mutated concurrently using any of the methods declared in addition in this class
36+
* (e.g. {@link #tryAddBucket(long, long, boolean)}).
3337
*/
3438
final class FixedCapacityExponentialHistogram extends AbstractExponentialHistogram implements ReleasableExponentialHistogram {
3539

@@ -243,8 +247,10 @@ private class Buckets implements ExponentialHistogram.Buckets {
243247

244248
private final boolean isPositive;
245249
private int numBuckets;
246-
private int cachedValueSumForNumBuckets;
247-
private long cachedValueSum;
250+
251+
private record CachedCountsSum(int numBuckets, long countsSum) {}
252+
253+
private CachedCountsSum cachedCountsSum;
248254

249255
/**
250256
* @param isPositive true, if this object should represent the positive bucket range, false for the negative range
@@ -263,8 +269,7 @@ int startSlot() {
263269

264270
final void reset() {
265271
numBuckets = 0;
266-
cachedValueSumForNumBuckets = 0;
267-
cachedValueSum = 0;
272+
cachedCountsSum = null;
268273
}
269274

270275
boolean tryAddBucket(long index, long count) {
@@ -297,12 +302,26 @@ public OptionalLong maxBucketIndex() {
297302

298303
@Override
299304
public long valueCount() {
305+
// copy a reference to the field to avoid problems with concurrent updates
306+
CachedCountsSum cachedVal = cachedCountsSum;
307+
if (cachedVal != null && cachedVal.numBuckets == numBuckets) {
308+
return cachedVal.countsSum;
309+
}
310+
311+
long countsSum = 0;
312+
int position = 0;
313+
if (cachedVal != null) {
314+
countsSum = cachedVal.countsSum;
315+
position = cachedVal.numBuckets;
316+
}
317+
300318
int startSlot = startSlot();
301-
while (cachedValueSumForNumBuckets < numBuckets) {
302-
cachedValueSum += bucketCounts[startSlot + cachedValueSumForNumBuckets];
303-
cachedValueSumForNumBuckets++;
319+
while (position < numBuckets) {
320+
countsSum += bucketCounts[startSlot + position];
321+
position++;
304322
}
305-
return cachedValueSum;
323+
this.cachedCountsSum = new CachedCountsSum(position, countsSum);
324+
return countsSum;
306325
}
307326

308327
@Override

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/FixedCapacityExponentialHistogramTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,39 @@
2424
import org.elasticsearch.common.breaker.CircuitBreaker;
2525
import org.elasticsearch.common.unit.ByteSizeValue;
2626

27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.Future;
33+
import java.util.stream.IntStream;
34+
2735
import static org.hamcrest.Matchers.equalTo;
2836
import static org.hamcrest.Matchers.greaterThan;
2937

3038
public class FixedCapacityExponentialHistogramTests extends ExponentialHistogramTestCase {
3139

40+
public void testConcurrentHashCode() throws ExecutionException, InterruptedException {
41+
List<ExponentialHistogram> originalHistograms = IntStream.range(0, 1000)
42+
.mapToObj(i -> ExponentialHistogramTestUtils.randomHistogram())
43+
.toList();
44+
45+
List<? extends ExponentialHistogram> copies = originalHistograms.stream()
46+
.map(histo -> ExponentialHistogram.builder(histo, ExponentialHistogramCircuitBreaker.noop()).build())
47+
.toList();
48+
49+
// Compute potentially lazy data correctly on the originals
50+
originalHistograms.forEach(Object::hashCode);
51+
concurrentTest(() -> {
52+
for (int i = 0; i < originalHistograms.size(); i++) {
53+
ExponentialHistogram original = originalHistograms.get(i);
54+
ExponentialHistogram copy = copies.get(i);
55+
assertThat(copy.hashCode(), equalTo(original.hashCode()));
56+
}
57+
});
58+
}
59+
3260
public void testValueCountUpdatedCorrectly() {
3361

3462
FixedCapacityExponentialHistogram histogram = FixedCapacityExponentialHistogram.create(100, breaker());
@@ -69,4 +97,21 @@ public void testMemoryAccounting() {
6997
}
7098
assertThat(esBreaker.getUsed(), equalTo(0L));
7199
}
100+
101+
protected void concurrentTest(Runnable r) throws InterruptedException, ExecutionException {
102+
int threads = 5;
103+
int tasks = threads * 2;
104+
ExecutorService exec = Executors.newFixedThreadPool(threads);
105+
try {
106+
List<Future<?>> results = new ArrayList<>();
107+
for (int t = 0; t < tasks; t++) {
108+
results.add(exec.submit(r));
109+
}
110+
for (Future<?> f : results) {
111+
f.get();
112+
}
113+
} finally {
114+
exec.shutdown();
115+
}
116+
}
72117
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,13 +1060,6 @@ private static ExponentialHistogram randomExponentialHistogram() {
10601060
ExponentialHistogramCircuitBreaker.noop(),
10611061
rawValues
10621062
);
1063-
/*
1064-
* hashcode mutates the histogram by doing some lazy work.
1065-
* That can change the value of the hashCode if you call it concurrently....
1066-
* This works around that by running it once up front. Jonas will have a
1067-
* look at this one soon.
1068-
*/
1069-
histo.hashCode();
10701063
return histo;
10711064
}
10721065

0 commit comments

Comments
 (0)