Skip to content

Commit 1faa351

Browse files
authored
Add CircuitBreaker to TDigest, Step 2: Add CB to array wrappers (#113105)
Part of #99815 ## Steps 1. Migrate TDigest classes to use a custom Array implementation. Temporarily use a simple array wrapper (#112810) 2. Implement CircuitBreaking in the `MemoryTrackingTDigestArrays` class. Add `Releasable` and ensure it's always closed within TDigest (This PR) 3. Pass the CircuitBreaker as a parameter to TDigestState from wherever it's being used 4. Account remaining TDigest classes size ("SHALLOW_SIZE") Every step should be safely mergeable to main: - The first and second steps should have no impact. - The third and fourth ones will start increasing the CB count partially. ## Remarks To simplify testing the CircuitBreaker, added a helper method + `@After` to ESTestCase. Right now CBs are usually tested through MockBigArrays. E.g: https://github.com/elastic/elasticsearch/blob/f7a0196b454b17f7928728a26084000238c4efaa/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java#L1263-L1265 So I guess there was no need for this yet. But I may have missed something somewhere. Also, I'm separating this PR from the "step 3" as integrating this (CB) in the current usages may require some refactor of external code, which may be somewhat more _dangerous_
1 parent 701ed61 commit 1faa351

34 files changed

+1147
-483
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/SortBench.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121

2222
package org.elasticsearch.benchmark.tdigest;
2323

24+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
25+
import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
2426
import org.elasticsearch.tdigest.Sort;
27+
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2528
import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
2629
import org.elasticsearch.tdigest.arrays.TDigestIntArray;
27-
import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
2830
import org.openjdk.jmh.annotations.Benchmark;
2931
import org.openjdk.jmh.annotations.BenchmarkMode;
3032
import org.openjdk.jmh.annotations.Fork;
@@ -51,7 +53,8 @@
5153
@State(Scope.Thread)
5254
public class SortBench {
5355
private final int size = 100000;
54-
private final TDigestDoubleArray values = WrapperTDigestArrays.INSTANCE.newDoubleArray(size);
56+
private final TDigestArrays arrays = new MemoryTrackingTDigestArrays(new NoopCircuitBreaker("default-wrapper-tdigest-arrays"));
57+
private final TDigestDoubleArray values = arrays.newDoubleArray(size);
5558

5659
@Param({ "0", "1", "-1" })
5760
public int sortDirection;
@@ -72,7 +75,7 @@ public void setup() {
7275

7376
@Benchmark
7477
public void stableSort() {
75-
TDigestIntArray order = WrapperTDigestArrays.INSTANCE.newIntArray(size);
78+
TDigestIntArray order = arrays.newIntArray(size);
7679
for (int i = 0; i < size; i++) {
7780
order.set(i, i);
7881
}

benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/TDigestBench.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
package org.elasticsearch.benchmark.tdigest;
2323

24+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
25+
import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
2426
import org.elasticsearch.tdigest.MergingDigest;
2527
import org.elasticsearch.tdigest.TDigest;
26-
import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
28+
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2729
import org.openjdk.jmh.annotations.Benchmark;
2830
import org.openjdk.jmh.annotations.BenchmarkMode;
2931
import org.openjdk.jmh.annotations.Fork;
@@ -56,24 +58,25 @@
5658
@Threads(1)
5759
@State(Scope.Thread)
5860
public class TDigestBench {
61+
private static final TDigestArrays arrays = new MemoryTrackingTDigestArrays(new NoopCircuitBreaker("default-wrapper-tdigest-arrays"));
5962

6063
public enum TDigestFactory {
6164
MERGE {
6265
@Override
6366
TDigest create(double compression) {
64-
return new MergingDigest(WrapperTDigestArrays.INSTANCE, compression, (int) (10 * compression));
67+
return new MergingDigest(arrays, compression, (int) (10 * compression));
6568
}
6669
},
6770
AVL_TREE {
6871
@Override
6972
TDigest create(double compression) {
70-
return TDigest.createAvlTreeDigest(WrapperTDigestArrays.INSTANCE, compression);
73+
return TDigest.createAvlTreeDigest(arrays, compression);
7174
}
7275
},
7376
HYBRID {
7477
@Override
7578
TDigest create(double compression) {
76-
return TDigest.createHybridDigest(WrapperTDigestArrays.INSTANCE, compression);
79+
return TDigest.createHybridDigest(arrays, compression);
7780
}
7881
};
7982

libs/tdigest/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ apply plugin: 'elasticsearch.build'
2222
apply plugin: 'elasticsearch.publish'
2323

2424
dependencies {
25+
api project(':libs:elasticsearch-core')
26+
2527
testImplementation(project(":test:framework")) {
2628
exclude group: 'org.elasticsearch', module: 'elasticsearch-tdigest'
2729
}

libs/tdigest/src/main/java/module-info.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919

2020
module org.elasticsearch.tdigest {
21+
requires org.elasticsearch.base;
22+
2123
exports org.elasticsearch.tdigest;
2224
exports org.elasticsearch.tdigest.arrays;
2325
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
package org.elasticsearch.tdigest;
2323

24+
import org.elasticsearch.core.Releasable;
25+
import org.elasticsearch.core.Releasables;
2426
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2527
import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
2628
import org.elasticsearch.tdigest.arrays.TDigestLongArray;
@@ -31,7 +33,7 @@
3133
/**
3234
* A tree of t-digest centroids.
3335
*/
34-
final class AVLGroupTree extends AbstractCollection<Centroid> {
36+
final class AVLGroupTree extends AbstractCollection<Centroid> implements Releasable {
3537
/* For insertions into the tree */
3638
private double centroid;
3739
private long count;
@@ -267,4 +269,8 @@ private void checkAggregates(int node) {
267269
}
268270
}
269271

272+
@Override
273+
public void close() {
274+
Releasables.close(centroids, counts, aggregatedCounts, tree);
275+
}
270276
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
package org.elasticsearch.tdigest;
2323

24+
import org.elasticsearch.core.Releasables;
2425
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2526

2627
import java.util.Collection;
@@ -153,26 +154,27 @@ public void compress() {
153154
}
154155
needsCompression = false;
155156

156-
AVLGroupTree centroids = summary;
157-
this.summary = new AVLGroupTree(arrays);
157+
try (AVLGroupTree centroids = summary) {
158+
this.summary = new AVLGroupTree(arrays);
158159

159-
final int[] nodes = new int[centroids.size()];
160-
nodes[0] = centroids.first();
161-
for (int i = 1; i < nodes.length; ++i) {
162-
nodes[i] = centroids.next(nodes[i - 1]);
163-
assert nodes[i] != IntAVLTree.NIL;
164-
}
165-
assert centroids.next(nodes[nodes.length - 1]) == IntAVLTree.NIL;
160+
final int[] nodes = new int[centroids.size()];
161+
nodes[0] = centroids.first();
162+
for (int i = 1; i < nodes.length; ++i) {
163+
nodes[i] = centroids.next(nodes[i - 1]);
164+
assert nodes[i] != IntAVLTree.NIL;
165+
}
166+
assert centroids.next(nodes[nodes.length - 1]) == IntAVLTree.NIL;
166167

167-
for (int i = centroids.size() - 1; i > 0; --i) {
168-
final int other = gen.nextInt(i + 1);
169-
final int tmp = nodes[other];
170-
nodes[other] = nodes[i];
171-
nodes[i] = tmp;
172-
}
168+
for (int i = centroids.size() - 1; i > 0; --i) {
169+
final int other = gen.nextInt(i + 1);
170+
final int tmp = nodes[other];
171+
nodes[other] = nodes[i];
172+
nodes[i] = tmp;
173+
}
173174

174-
for (int node : nodes) {
175-
add(centroids.mean(node), centroids.count(node));
175+
for (int node : nodes) {
176+
add(centroids.mean(node), centroids.count(node));
177+
}
176178
}
177179
}
178180

@@ -356,4 +358,9 @@ public int byteSize() {
356358
compress();
357359
return 64 + summary.size() * 13;
358360
}
361+
362+
@Override
363+
public void close() {
364+
Releasables.close(summary);
365+
}
359366
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.tdigest;
2121

22+
import org.elasticsearch.core.Releasables;
2223
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2324

2425
import java.util.Collection;
@@ -110,6 +111,7 @@ public void reserve(long size) {
110111
}
111112
mergingDigest.reserve(size);
112113
// Release the allocated SortingDigest.
114+
sortingDigest.close();
113115
sortingDigest = null;
114116
} else {
115117
sortingDigest.reserve(size);
@@ -196,4 +198,9 @@ public int byteSize() {
196198
}
197199
return sortingDigest.byteSize();
198200
}
201+
202+
@Override
203+
public void close() {
204+
Releasables.close(sortingDigest, mergingDigest);
205+
}
199206
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/IntAVLTree.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
package org.elasticsearch.tdigest;
2323

24+
import org.elasticsearch.core.Releasable;
25+
import org.elasticsearch.core.Releasables;
2426
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2527
import org.elasticsearch.tdigest.arrays.TDigestByteArray;
2628
import org.elasticsearch.tdigest.arrays.TDigestIntArray;
@@ -33,7 +35,7 @@
3335
* want to add data to the nodes, typically by using arrays and node
3436
* identifiers as indices.
3537
*/
36-
abstract class IntAVLTree {
38+
abstract class IntAVLTree implements Releasable {
3739
/**
3840
* We use <code>0</code> instead of <code>-1</code> so that left(NIL) works without
3941
* condition.
@@ -586,4 +588,8 @@ int size() {
586588

587589
}
588590

591+
@Override
592+
public void close() {
593+
Releasables.close(parent, left, right, depth);
594+
}
589595
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
package org.elasticsearch.tdigest;
2323

24+
import org.elasticsearch.core.Releasables;
2425
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2526
import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
2627
import org.elasticsearch.tdigest.arrays.TDigestIntArray;
@@ -66,8 +67,6 @@
6667
* what the AVLTreeDigest uses and no dynamic allocation is required at all.
6768
*/
6869
public class MergingDigest extends AbstractTDigest {
69-
private final TDigestArrays arrays;
70-
7170
private int mergeCount = 0;
7271

7372
private final double publicCompression;
@@ -138,8 +137,6 @@ public MergingDigest(TDigestArrays arrays, double compression, int bufferSize) {
138137
* @param size Size of main buffer
139138
*/
140139
public MergingDigest(TDigestArrays arrays, double compression, int bufferSize, int size) {
141-
this.arrays = arrays;
142-
143140
// ensure compression >= 10
144141
// default size = 2 * ceil(compression)
145142
// default bufferSize = 5 * size
@@ -274,9 +271,6 @@ private void merge(
274271
incomingWeight.set(incomingCount, weight, 0, lastUsedCell);
275272
incomingCount += lastUsedCell;
276273

277-
if (incomingOrder == null) {
278-
incomingOrder = arrays.newIntArray(incomingCount);
279-
}
280274
Sort.stableSort(incomingOrder, incomingMean, incomingCount);
281275

282276
totalWeight += unmergedWeight;
@@ -581,4 +575,9 @@ public String toString() {
581575
+ "-"
582576
+ (useTwoLevelCompression ? "twoLevel" : "oneLevel");
583577
}
578+
579+
@Override
580+
public void close() {
581+
Releasables.close(weight, mean, tempWeight, tempMean, order);
582+
}
584583
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.tdigest;
2121

22+
import org.elasticsearch.core.Releasables;
2223
import org.elasticsearch.tdigest.arrays.TDigestArrays;
2324
import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
2425

@@ -137,4 +138,9 @@ public void reserve(long size) {
137138
public int byteSize() {
138139
return values.size() * 8;
139140
}
141+
142+
@Override
143+
public void close() {
144+
Releasables.close(values);
145+
}
140146
}

0 commit comments

Comments
 (0)