Skip to content

Commit 7cc2316

Browse files
not-napoleonhenningandersenDaveCTurner
authored
Possible source of leaked delayable writables (#80166) (#80292)
In a couple of error paths, it's possible that QueryPhaseResultsConsumer#PendingMerges might not have consumed all the aggregations it's trying to merge. It is never the less important to release those aggregations so that we don't leak memory or hold references to them. This PR achieves that by using the Releasables.close() mechanism, which will execute each close action, even if earlier actions had exceptions. This ensures that all of the aggregations get released and that the circuit breaker gets cleaned up. Co-authored-by: Henning Andersen <[email protected]> Co-authored-by: David Turner <[email protected]> Co-authored-by: Henning Andersen <[email protected]> Co-authored-by: David Turner <[email protected]>
1 parent ca9403f commit 7cc2316

File tree

3 files changed

+41
-6
lines changed

3 files changed

+41
-6
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.atomic.AtomicReference;
3737
import java.util.function.Consumer;
3838
import java.util.function.Supplier;
39+
import java.util.stream.Collectors;
3940

4041
import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;
4142
import static org.elasticsearch.action.search.SearchPhaseController.mergeTopDocs;
@@ -249,14 +250,24 @@ private class PendingMerges implements Releasable {
249250

250251
@Override
251252
public synchronized void close() {
252-
assert hasPendingMerges() == false : "cannot close with partial reduce in-flight";
253253
if (hasFailure()) {
254254
assert circuitBreakerBytes == 0;
255-
return;
255+
} else {
256+
assert circuitBreakerBytes >= 0;
257+
}
258+
259+
List<Releasable> toRelease = new ArrayList<>(buffer.stream().<Releasable>map(b -> b::releaseAggs).collect(Collectors.toList()));
260+
toRelease.add(() -> {
261+
circuitBreaker.addWithoutBreaking(-circuitBreakerBytes);
262+
circuitBreakerBytes = 0;
263+
});
264+
265+
Releasables.close(toRelease);
266+
267+
if (hasPendingMerges()) {
268+
// This is a theoretically unreachable exception.
269+
throw new IllegalStateException("Attempted to close with partial reduce in-flight");
256270
}
257-
assert circuitBreakerBytes >= 0;
258-
circuitBreaker.addWithoutBreaking(-circuitBreakerBytes);
259-
circuitBreakerBytes = 0;
260271
}
261272

262273
synchronized Exception getFailure() {

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,9 @@ private void readFromWithId(ShardSearchContextId id, StreamInput in, boolean del
377377
@Override
378378
public void writeTo(StreamOutput out) throws IOException {
379379
// we do not know that it is being sent over transport, but this at least protects all writes from happening, including sending.
380-
assert aggregations == null || aggregations.isSerialized() == false : "cannot send serialized version since it will leak";
380+
if (aggregations != null && aggregations.isSerialized()) {
381+
throw new IllegalStateException("cannot send serialized version since it will leak");
382+
}
381383
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
382384
out.writeBoolean(isNull);
383385
}

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.atomic.AtomicInteger;
3838

3939
import static org.elasticsearch.search.SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING;
40+
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
4041
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4142
import static org.hamcrest.Matchers.containsString;
4243
import static org.hamcrest.Matchers.equalTo;
@@ -454,6 +455,27 @@ public void testSearchPhaseFailure() throws Exception {
454455
ensureTaskNotRunning(response.getId());
455456
}
456457

458+
public void testSearchPhaseFailureLeak() throws Exception {
459+
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
460+
request.setKeepOnCompletion(true);
461+
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
462+
request.getSearchRequest().allowPartialSearchResults(false);
463+
request.getSearchRequest()
464+
.source(
465+
new SearchSourceBuilder().query(
466+
new ThrowingQueryBuilder(randomLong(), new AlreadyClosedException("boom"), between(0, numShards - 1))
467+
)
468+
);
469+
request.getSearchRequest().source().aggregation(terms("f").field("f").size(between(1, 10)));
470+
471+
AsyncSearchResponse response = submitAsyncSearch(request);
472+
assertFalse(response.isRunning());
473+
assertTrue(response.isPartial());
474+
assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
475+
assertNotNull(response.getFailure());
476+
ensureTaskNotRunning(response.getId());
477+
}
478+
457479
public void testMaxResponseSize() {
458480
SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder())
459481
.aggregation(AggregationBuilders.terms("terms").field("terms.keyword").size(numKeywords));

0 commit comments

Comments
 (0)