Skip to content

Commit 0dc0965

Browse files
authored
Add CircuitBreaker to TDigest, Step 3: Connect with CB (elastic#113387) (elastic#113885)
Part of elastic#99815 ## Steps 1. Migrate TDigest classes to use a custom Array implementation. Temporarily use a simple array wrapper (elastic#112810) 2. Implement CircuitBreaking in the `WrapperTDigestArrays` class. Add Releasable/AutoCloseable and ensure everything is closed (elastic#113105) 3. Pass the CircuitBreaker as a parameter to TDigestState from wherever it's being used (This PR) - ESQL: Pass a real CB - Other aggs: Use the deprecated methods on `TDigestState`, that will use a No-op CB instead 4. Account remaining TDigest classes size ("SHALLOW_SIZE") Every step should be safely mergeable to main: - The first and second steps should have no impact. - The third and fourth ones will start increasing the CB count partially. ## Remarks TDigestStates are Releasable, and should be closed now. However, old aggregations don't close them, as it's not trivial, and as they are using the NoopCircuitBreaker, there's no need to close them
1 parent 95193da commit 0dc0965

File tree

59 files changed

+297
-242
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+297
-242
lines changed

docs/changelog/113387.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 113387
2+
summary: "Add `CircuitBreaker` to TDigest, Step 3: Connect with ESQL CB"
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractTDigestPercentilesAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private TDigestState getExistingOrNewHistogram(final BigArrays bigArrays, long b
8787
states = bigArrays.grow(states, bucket + 1);
8888
TDigestState state = states.get(bucket);
8989
if (state == null) {
90-
state = TDigestState.create(compression, executionHint);
90+
state = TDigestState.createWithoutCircuitBreaking(compression, executionHint);
9191
states.set(bucket, state);
9292
}
9393
return state;

server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
package org.elasticsearch.search.aggregations.metrics;
1111

12+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
13+
1214
public final class EmptyTDigestState extends TDigestState {
1315
public EmptyTDigestState() {
1416
// Use the sorting implementation to minimize memory allocation.
15-
super(MemoryTrackingTDigestArrays.INSTANCE, Type.SORTING, 1.0D);
17+
super(new NoopCircuitBreaker("empty-tdigest-state-noop-breaker"), Type.SORTING, 1.0D);
1618
}
1719

1820
@Override

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ public static double computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
3030
return Double.NaN;
3131
} else {
3232
final double approximateMedian = valuesSketch.quantile(0.5);
33-
final TDigestState approximatedDeviationsSketch = TDigestState.createUsingParamsFrom(valuesSketch);
34-
valuesSketch.centroids().forEach(centroid -> {
35-
final double deviation = Math.abs(approximateMedian - centroid.mean());
36-
approximatedDeviationsSketch.add(deviation, centroid.count());
37-
});
33+
try (TDigestState approximatedDeviationsSketch = TDigestState.createUsingParamsFrom(valuesSketch)) {
34+
valuesSketch.centroids().forEach(centroid -> {
35+
final double deviation = Math.abs(approximateMedian - centroid.mean());
36+
approximatedDeviationsSketch.add(deviation, centroid.count());
37+
});
3838

39-
return approximatedDeviationsSketch.quantile(0.5);
39+
return approximatedDeviationsSketch.quantile(0.5);
40+
}
4041
}
4142
}
4243

@@ -69,7 +70,12 @@ static InternalMedianAbsoluteDeviation empty(
6970
double compression,
7071
TDigestExecutionHint executionHint
7172
) {
72-
return new InternalMedianAbsoluteDeviation(name, metadata, format, TDigestState.create(compression, executionHint));
73+
return new InternalMedianAbsoluteDeviation(
74+
name,
75+
metadata,
76+
format,
77+
TDigestState.createWithoutCircuitBreaking(compression, executionHint)
78+
);
7379
}
7480

7581
@Override

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTDigestPercentileRanks.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public static InternalTDigestPercentileRanks empty(
5151
DocValueFormat format,
5252
Map<String, Object> metadata
5353
) {
54-
TDigestState state = TDigestState.create(compression, executionHint);
54+
TDigestState state = TDigestState.createWithoutCircuitBreaking(compression, executionHint);
5555
return new InternalTDigestPercentileRanks(name, keys, state, keyed, format, metadata);
5656
}
5757

server/src/main/java/org/elasticsearch/search/aggregations/metrics/MedianAbsoluteDeviationAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private TDigestState getExistingOrNewHistogram(final BigArrays bigArrays, long b
101101
valueSketches = bigArrays.grow(valueSketches, bucket + 1);
102102
TDigestState state = valueSketches.get(bucket);
103103
if (state == null) {
104-
state = TDigestState.create(compression, executionHint);
104+
state = TDigestState.createWithoutCircuitBreaking(compression, executionHint);
105105
valueSketches.set(bucket, state);
106106
}
107107
return state;

server/src/main/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArrays.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.util.ArrayUtil;
1414
import org.apache.lucene.util.RamUsageEstimator;
1515
import org.elasticsearch.common.breaker.CircuitBreaker;
16-
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1716
import org.elasticsearch.core.Releasable;
1817
import org.elasticsearch.tdigest.arrays.TDigestArrays;
1918
import org.elasticsearch.tdigest.arrays.TDigestByteArray;
@@ -29,16 +28,6 @@
2928
*/
3029
public class MemoryTrackingTDigestArrays implements TDigestArrays {
3130

32-
/**
33-
* Default no-op CB instance of the wrapper.
34-
*
35-
* @deprecated This instance shouldn't be used, and will be removed after all usages are replaced.
36-
*/
37-
@Deprecated
38-
public static final MemoryTrackingTDigestArrays INSTANCE = new MemoryTrackingTDigestArrays(
39-
new NoopCircuitBreaker("default-wrapper-tdigest-arrays")
40-
);
41-
4231
private final CircuitBreaker breaker;
4332

4433
public MemoryTrackingTDigestArrays(CircuitBreaker breaker) {

server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
package org.elasticsearch.search.aggregations.metrics;
1010

1111
import org.elasticsearch.TransportVersions;
12+
import org.elasticsearch.common.breaker.CircuitBreaker;
13+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1214
import org.elasticsearch.common.io.stream.StreamInput;
1315
import org.elasticsearch.common.io.stream.StreamOutput;
1416
import org.elasticsearch.core.Releasable;
1517
import org.elasticsearch.core.Releasables;
1618
import org.elasticsearch.tdigest.Centroid;
1719
import org.elasticsearch.tdigest.TDigest;
18-
import org.elasticsearch.tdigest.arrays.TDigestArrays;
1920

2021
import java.io.IOException;
2122
import java.util.Collection;
@@ -28,6 +29,10 @@
2829
*/
2930
public class TDigestState implements Releasable {
3031

32+
protected static final CircuitBreaker DEFAULT_NOOP_BREAKER = new NoopCircuitBreaker("default-tdigest-state-noop-breaker");
33+
34+
private final CircuitBreaker breaker;
35+
3136
private final double compression;
3237

3338
private final TDigest tdigest;
@@ -51,11 +56,12 @@ static Type valueForHighAccuracy() {
5156
private final Type type;
5257

5358
/**
54-
* @deprecated this method will be removed after all usages are replaced
59+
* @deprecated No-op circuit-breaked factory for TDigestState. Used in _search aggregations.
60+
* Please use the {@link #create(CircuitBreaker, double)} method instead on new usages.
5561
*/
5662
@Deprecated
57-
public static TDigestState create(double compression) {
58-
return create(MemoryTrackingTDigestArrays.INSTANCE, compression);
63+
public static TDigestState createWithoutCircuitBreaking(double compression) {
64+
return create(DEFAULT_NOOP_BREAKER, compression);
5965
}
6066

6167
/**
@@ -64,25 +70,26 @@ public static TDigestState create(double compression) {
6470
* @param compression the compression factor for the underlying {@link org.elasticsearch.tdigest.TDigest} object
6571
* @return a TDigestState object that's optimized for performance
6672
*/
67-
public static TDigestState create(TDigestArrays arrays, double compression) {
68-
return new TDigestState(arrays, Type.defaultValue(), compression);
73+
public static TDigestState create(CircuitBreaker breaker, double compression) {
74+
return new TDigestState(breaker, Type.defaultValue(), compression);
6975
}
7076

7177
/**
7278
* Factory for TDigestState that's optimized for high accuracy. It's substantially slower than the default implementation.
7379
* @param compression the compression factor for the underlying {@link org.elasticsearch.tdigest.TDigest} object
7480
* @return a TDigestState object that's optimized for performance
7581
*/
76-
public static TDigestState createOptimizedForAccuracy(TDigestArrays arrays, double compression) {
77-
return new TDigestState(arrays, Type.valueForHighAccuracy(), compression);
82+
static TDigestState createOptimizedForAccuracy(CircuitBreaker breaker, double compression) {
83+
return new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
7884
}
7985

8086
/**
81-
* @deprecated this method will be removed after all usages are replaced
87+
* @deprecated No-op circuit-breaked factory for TDigestState. Used in _search aggregations.
88+
* Please use the {@link #create(CircuitBreaker, double, TDigestExecutionHint)} method instead on new usages.
8289
*/
8390
@Deprecated
84-
public static TDigestState create(double compression, TDigestExecutionHint executionHint) {
85-
return create(MemoryTrackingTDigestArrays.INSTANCE, compression, executionHint);
91+
public static TDigestState createWithoutCircuitBreaking(double compression, TDigestExecutionHint executionHint) {
92+
return create(DEFAULT_NOOP_BREAKER, compression, executionHint);
8693
}
8794

8895
/**
@@ -93,10 +100,10 @@ public static TDigestState create(double compression, TDigestExecutionHint execu
93100
* @param executionHint controls which implementation is used; accepted values are 'high_accuracy' and '' (default)
94101
* @return a TDigestState object
95102
*/
96-
public static TDigestState create(TDigestArrays arrays, double compression, TDigestExecutionHint executionHint) {
103+
public static TDigestState create(CircuitBreaker breaker, double compression, TDigestExecutionHint executionHint) {
97104
return switch (executionHint) {
98-
case HIGH_ACCURACY -> createOptimizedForAccuracy(arrays, compression);
99-
case DEFAULT -> create(arrays, compression);
105+
case HIGH_ACCURACY -> createOptimizedForAccuracy(breaker, compression);
106+
case DEFAULT -> create(breaker, compression);
100107
};
101108
}
102109

@@ -107,10 +114,12 @@ public static TDigestState create(TDigestArrays arrays, double compression, TDig
107114
* @return a TDigestState object
108115
*/
109116
public static TDigestState createUsingParamsFrom(TDigestState state) {
110-
return new TDigestState(MemoryTrackingTDigestArrays.INSTANCE, state.type, state.compression);
117+
return new TDigestState(state.breaker, state.type, state.compression);
111118
}
112119

113-
protected TDigestState(TDigestArrays arrays, Type type, double compression) {
120+
protected TDigestState(CircuitBreaker breaker, Type type, double compression) {
121+
this.breaker = breaker;
122+
var arrays = new MemoryTrackingTDigestArrays(breaker);
114123
tdigest = switch (type) {
115124
case HYBRID -> TDigest.createHybridDigest(arrays, compression);
116125
case AVL_TREE -> TDigest.createAvlTreeDigest(arrays, compression);
@@ -140,22 +149,23 @@ public static void write(TDigestState state, StreamOutput out) throws IOExceptio
140149
}
141150

142151
/**
143-
* @deprecated this method will be removed after all usages are replaced
152+
* @deprecated No-op circuit-breaked factory for TDigestState. Used in _search aggregations.
153+
* Please use the {@link #read(CircuitBreaker, StreamInput)} method instead on new usages.
144154
*/
145155
@Deprecated
146156
public static TDigestState read(StreamInput in) throws IOException {
147-
return read(MemoryTrackingTDigestArrays.INSTANCE, in);
157+
return read(DEFAULT_NOOP_BREAKER, in);
148158
}
149159

150-
public static TDigestState read(TDigestArrays arrays, StreamInput in) throws IOException {
160+
public static TDigestState read(CircuitBreaker breaker, StreamInput in) throws IOException {
151161
double compression = in.readDouble();
152162
TDigestState state;
153163
long size = 0;
154164
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
155-
state = new TDigestState(arrays, Type.valueOf(in.readString()), compression);
165+
state = new TDigestState(breaker, Type.valueOf(in.readString()), compression);
156166
size = in.readVLong();
157167
} else {
158-
state = new TDigestState(arrays, Type.valueForHighAccuracy(), compression);
168+
state = new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
159169
}
160170
int n = in.readVInt();
161171
if (size > 0) {

server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class InternalMedianAbsoluteDeviationTests extends InternalAggregationTes
2323

2424
@Override
2525
protected InternalMedianAbsoluteDeviation createTestInstance(String name, Map<String, Object> metadata) {
26-
final TDigestState valuesSketch = TDigestState.create(randomFrom(50.0, 100.0, 200.0, 500.0, 1000.0));
26+
final TDigestState valuesSketch = TDigestState.createWithoutCircuitBreaking(randomFrom(50.0, 100.0, 200.0, 500.0, 1000.0));
2727
final int numberOfValues = frequently() ? randomIntBetween(0, 1000) : 0;
2828
for (int i = 0; i < numberOfValues; i++) {
2929
valuesSketch.add(randomDouble());

server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalTDigestPercentilesRanksTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ protected InternalTDigestPercentileRanks createTestInstance(
3636
if (empty) {
3737
return new InternalTDigestPercentileRanks(name, percents, null, keyed, format, metadata);
3838
}
39-
final TDigestState state = TDigestState.create(100);
39+
final TDigestState state = TDigestState.createWithoutCircuitBreaking(100);
4040
Arrays.stream(values).forEach(state::add);
4141

4242
return new InternalTDigestPercentileRanks(name, percents, state, keyed, format, metadata);

0 commit comments

Comments
 (0)