Skip to content

Commit 5fcfa48

Browse files
authored
Fix TDigestState.read CB leaks (#114303)
Closes #114194 Fixes `TDigestState.read()` CB leak on error on `.reserve()`.
1 parent 1e2b200 commit 5fcfa48

File tree

4 files changed

+107
-32
lines changed

4 files changed

+107
-32
lines changed

docs/changelog/114303.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 114303
2+
summary: Fix TDigestState.read CB leaks
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 114194

muted-tests.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,6 @@ tests:
328328
- class: org.elasticsearch.action.bulk.IncrementalBulkIT
329329
method: testIncrementalBulkLowWatermarkBackOff
330330
issue: https://github.com/elastic/elasticsearch/issues/114182
331-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
332-
issue: https://github.com/elastic/elasticsearch/issues/114194
333331
- class: org.elasticsearch.xpack.ilm.ExplainLifecycleIT
334332
method: testStepInfoPreservedOnAutoRetry
335333
issue: https://github.com/elastic/elasticsearch/issues/114220

server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public static TDigestState create(CircuitBreaker breaker, double compression) {
8484
}
8585
}
8686

87-
static TDigestState create(CircuitBreaker breaker, Type type, double compression) {
87+
static TDigestState createOfType(CircuitBreaker breaker, Type type, double compression) {
8888
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-with-type");
8989
try {
9090
return new TDigestState(breaker, type, compression);
@@ -196,28 +196,38 @@ public static TDigestState read(StreamInput in) throws IOException {
196196

197197
public static TDigestState read(CircuitBreaker breaker, StreamInput in) throws IOException {
198198
double compression = in.readDouble();
199-
TDigestState state;
199+
TDigestState state = null;
200200
long size = 0;
201-
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-read");
201+
boolean success = false;
202202
try {
203-
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
204-
state = new TDigestState(breaker, Type.valueOf(in.readString()), compression);
205-
size = in.readVLong();
206-
} else {
207-
state = new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
203+
breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-read");
204+
try {
205+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
206+
state = new TDigestState(breaker, Type.valueOf(in.readString()), compression);
207+
size = in.readVLong();
208+
} else {
209+
state = new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
210+
}
211+
} finally {
212+
if (state == null) {
213+
breaker.addWithoutBreaking(-SHALLOW_SIZE);
214+
}
215+
}
216+
217+
int n = in.readVInt();
218+
if (size > 0) {
219+
state.tdigest.reserve(size);
220+
}
221+
for (int i = 0; i < n; i++) {
222+
state.add(in.readDouble(), in.readVLong());
223+
}
224+
success = true;
225+
return state;
226+
} finally {
227+
if (success == false) {
228+
Releasables.close(state);
208229
}
209-
} catch (Exception e) {
210-
breaker.addWithoutBreaking(-SHALLOW_SIZE);
211-
throw e;
212-
}
213-
int n = in.readVInt();
214-
if (size > 0) {
215-
state.tdigest.reserve(size);
216-
}
217-
for (int i = 0; i < n; i++) {
218-
state.add(in.readDouble(), in.readVLong());
219230
}
220-
return state;
221231
}
222232

223233
@Override

server/src/test/java/org/elasticsearch/search/aggregations/metrics/TDigestStateReleasingTests.java

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@
1313

1414
import org.elasticsearch.common.breaker.CircuitBreaker;
1515
import org.elasticsearch.common.breaker.CircuitBreakingException;
16+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
17+
import org.elasticsearch.common.io.stream.StreamInput;
1618
import org.elasticsearch.common.unit.ByteSizeValue;
19+
import org.elasticsearch.core.CheckedFunction;
20+
import org.elasticsearch.indices.CrankyCircuitBreakerService;
1721
import org.elasticsearch.test.ESTestCase;
1822

23+
import java.io.IOException;
1924
import java.util.Arrays;
2025

2126
import static org.hamcrest.Matchers.equalTo;
@@ -32,28 +37,84 @@ public TDigestStateReleasingTests(TDigestState.Type digestType) {
3237
this.digestType = digestType;
3338
}
3439

40+
public void testCreateOfType() {
41+
testCircuitBreakerTrip(circuitBreaker -> TDigestState.createOfType(circuitBreaker, digestType, 100));
42+
}
43+
44+
public void testCreateUsingParamsFrom() {
45+
testCircuitBreakerTrip(circuitBreaker -> {
46+
try (TDigestState example = TDigestState.createOfType(newLimitedBreaker(ByteSizeValue.ofMb(100)), digestType, 100)) {
47+
return TDigestState.createUsingParamsFrom(example);
48+
}
49+
});
50+
}
51+
52+
/**
53+
* This test doesn't use the {@code digestType} param.
54+
*/
55+
public void testCreate() {
56+
testCircuitBreakerTrip(circuitBreaker -> TDigestState.create(circuitBreaker, 100));
57+
}
58+
59+
/**
60+
* This test doesn't use the {@code digestType} param.
61+
*/
62+
public void testCreateOptimizedForAccuracy() {
63+
testCircuitBreakerTrip(circuitBreaker -> TDigestState.createOptimizedForAccuracy(circuitBreaker, 100));
64+
}
65+
66+
public void testRead() throws IOException {
67+
try (
68+
TDigestState state = TDigestState.createOfType(newLimitedBreaker(ByteSizeValue.ofMb(100)), digestType, 100);
69+
BytesStreamOutput output = new BytesStreamOutput()
70+
) {
71+
TDigestState.write(state, output);
72+
73+
testCircuitBreakerTrip(circuitBreaker -> {
74+
try (StreamInput input = output.bytes().streamInput()) {
75+
return TDigestState.read(circuitBreaker, input);
76+
}
77+
});
78+
}
79+
}
80+
81+
public void testReadWithData() throws IOException {
82+
try (
83+
TDigestState state = TDigestState.createOfType(newLimitedBreaker(ByteSizeValue.ofMb(100)), digestType, 100);
84+
BytesStreamOutput output = new BytesStreamOutput()
85+
) {
86+
for (int i = 0; i < 1000; i++) {
87+
state.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
88+
}
89+
90+
TDigestState.write(state, output);
91+
92+
testCircuitBreakerTrip(circuitBreaker -> {
93+
try (StreamInput input = output.bytes().streamInput()) {
94+
return TDigestState.read(circuitBreaker, input);
95+
}
96+
});
97+
}
98+
}
99+
35100
/**
36101
* Tests that a circuit breaker trip leaves no unreleased memory.
37102
*/
38-
public void testCircuitBreakerTrip() {
39-
for (int bytes = randomIntBetween(0, 16); bytes < 50_000; bytes += 17) {
40-
CircuitBreaker breaker = newLimitedBreaker(ByteSizeValue.ofBytes(bytes));
103+
public <E extends Exception> void testCircuitBreakerTrip(CheckedFunction<CircuitBreaker, TDigestState, E> tDigestStateFactory)
104+
throws E {
105+
try (CrankyCircuitBreakerService circuitBreakerService = new CrankyCircuitBreakerService()) {
106+
CircuitBreaker breaker = circuitBreakerService.getBreaker("test");
41107

42-
try (TDigestState state = TDigestState.create(breaker, digestType, 100)) {
108+
try (TDigestState state = tDigestStateFactory.apply(breaker)) {
43109
// Add some data to make it trip. It won't work in all digest types
44-
for (int i = 0; i < 100; i++) {
110+
for (int i = 0; i < 10; i++) {
45111
state.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
46112
}
47-
48-
// Testing with more memory shouldn't change anything, we finished the test
49-
return;
50113
} catch (CircuitBreakingException e) {
51114
// Expected
52115
} finally {
53-
assertThat("unreleased bytes with a " + bytes + " bytes limit", breaker.getUsed(), equalTo(0L));
116+
assertThat("unreleased bytes", breaker.getUsed(), equalTo(0L));
54117
}
55118
}
56-
57-
fail("Test case didn't reach a non-tripping breaker limit");
58119
}
59120
}

0 commit comments

Comments
 (0)