Skip to content

Commit 68bcaf9

Browse files
Incrementally release SearchResponse during REST serialization
This should save a lot of heap for large responses that take a while to get flushed to the wire.
1 parent 789eb2f commit 68bcaf9

File tree

5 files changed

+111
-17
lines changed

5 files changed

+111
-17
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ private static InternalAggregations aggregate(
302302
int resultSetSize,
303303
AggregationReduceContext reduceContext
304304
) {
305-
interface ReleasableIterator extends Iterator<InternalAggregations>, Releasable {}
306-
try (var aggsIter = new ReleasableIterator() {
305+
try (var aggsIter = new ReleasableIterator<InternalAggregations>() {
307306

308307
private Releasable toRelease;
309308

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.core.Releasable;
13+
14+
import java.util.Iterator;
15+
16+
/**
17+
* Iterator that must be released after the last element consumed from it is not needed any longer.
18+
*/
19+
public interface ReleasableIterator<T> extends Iterator<T>, Releasable {}

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.OriginalIndices;
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.common.bytes.BytesReference;
17-
import org.elasticsearch.common.collect.Iterators;
1817
import org.elasticsearch.common.io.stream.DelayableWriteable;
1918
import org.elasticsearch.common.io.stream.StreamInput;
2019
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -25,6 +24,8 @@
2524
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
2625
import org.elasticsearch.core.Nullable;
2726
import org.elasticsearch.core.RefCounted;
27+
import org.elasticsearch.core.Releasable;
28+
import org.elasticsearch.core.Releasables;
2829
import org.elasticsearch.core.SimpleRefCounted;
2930
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.rest.RestStatus;
@@ -42,6 +43,7 @@
4243
import org.elasticsearch.xcontent.XContentBuilder;
4344

4445
import java.io.IOException;
46+
import java.util.ArrayDeque;
4547
import java.util.Base64;
4648
import java.util.Collections;
4749
import java.util.Iterator;
@@ -389,7 +391,7 @@ public Clusters getClusters() {
389391
}
390392

391393
@Override
392-
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
394+
public ReleasableIterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
393395
assert hasReferences();
394396
return getToXContentIterator(true, params);
395397
}
@@ -398,17 +400,68 @@ public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params p
398400
return getToXContentIterator(false, params);
399401
}
400402

401-
private Iterator<ToXContent> getToXContentIterator(boolean wrapInObject, ToXContent.Params params) {
402-
return Iterators.concat(
403-
wrapInObject ? ChunkedToXContentHelper.startObject() : Collections.emptyIterator(),
404-
ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent),
405-
Iterators.single(clusters),
406-
hits.toXContentChunked(params),
407-
aggregations == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(aggregations),
408-
suggest == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(suggest),
409-
profileResults == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(profileResults),
410-
wrapInObject ? ChunkedToXContentHelper.endObject() : Collections.emptyIterator()
411-
);
403+
private ReleasableIterator<ToXContent> getToXContentIterator(boolean wrapInObject, ToXContent.Params params) {
404+
final ArrayDeque<Iterator<? extends ToXContent>> iters = new ArrayDeque<>(7);
405+
if (wrapInObject) {
406+
iters.addLast(ChunkedToXContentHelper.startObject());
407+
}
408+
iters.addLast(ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent));
409+
iters.addLast(ChunkedToXContentHelper.chunk(clusters));
410+
var hits = this.hits;
411+
hits.incRef();
412+
iters.addLast(hits.toXContentChunked(params));
413+
final Releasable releaseHits = Releasables.releaseOnce(hits::decRef);
414+
iters.addLast(ChunkedToXContentHelper.chunk((b, p) -> {
415+
releaseHits.close();
416+
return b;
417+
}));
418+
var aggregations = this.aggregations;
419+
if (aggregations != null) {
420+
iters.addLast(ChunkedToXContentHelper.chunk(aggregations));
421+
}
422+
var suggest = this.suggest;
423+
if (suggest != null) {
424+
iters.addLast(ChunkedToXContentHelper.chunk(suggest));
425+
}
426+
var profileResults = this.profileResults;
427+
if (profileResults != null) {
428+
iters.addLast(ChunkedToXContentHelper.chunk(profileResults));
429+
}
430+
if (wrapInObject) {
431+
iters.addLast(ChunkedToXContentHelper.endObject());
432+
}
433+
return new ReleasableIterator<>() {
434+
435+
@Override
436+
public void close() {
437+
releaseHits.close();
438+
}
439+
440+
Iterator<? extends ToXContent> current = iters.pollFirst();
441+
442+
@Override
443+
public boolean hasNext() {
444+
while (true) {
445+
if (current.hasNext()) {
446+
return true;
447+
}
448+
var c = current = iters.pollFirst();
449+
if (c == null) {
450+
current = Collections.emptyIterator();
451+
return false;
452+
}
453+
}
454+
}
455+
456+
@Override
457+
public ToXContent next() {
458+
while (current.hasNext() == false) {
459+
current = iters.pollFirst();
460+
}
461+
var res = current.next();
462+
return res;
463+
}
464+
};
412465
}
413466

414467
public XContentBuilder headerToXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {

server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
*/
2626
public class RestChunkedToXContentListener<Response extends ChunkedToXContent> extends RestActionListener<Response> {
2727

28-
private final ToXContent.Params params;
28+
protected final ToXContent.Params params;
2929

3030
public RestChunkedToXContentListener(RestChannel channel) {
3131
this(channel, channel.request());

server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,22 @@
1111

1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.action.ActionRequestValidationException;
14+
import org.elasticsearch.action.search.ReleasableIterator;
1415
import org.elasticsearch.action.search.SearchRequest;
16+
import org.elasticsearch.action.search.SearchResponse;
1517
import org.elasticsearch.action.search.TransportSearchAction;
1618
import org.elasticsearch.action.support.IndicesOptions;
1719
import org.elasticsearch.client.internal.node.NodeClient;
1820
import org.elasticsearch.common.Strings;
1921
import org.elasticsearch.core.Booleans;
2022
import org.elasticsearch.core.Nullable;
23+
import org.elasticsearch.core.Releasable;
2124
import org.elasticsearch.features.NodeFeature;
2225
import org.elasticsearch.index.query.QueryBuilder;
2326
import org.elasticsearch.rest.BaseRestHandler;
27+
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
2428
import org.elasticsearch.rest.RestRequest;
29+
import org.elasticsearch.rest.RestResponse;
2530
import org.elasticsearch.rest.Scope;
2631
import org.elasticsearch.rest.ServerlessScope;
2732
import org.elasticsearch.rest.action.RestActions;
@@ -36,6 +41,7 @@
3641
import org.elasticsearch.search.suggest.SuggestBuilder;
3742
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
3843
import org.elasticsearch.usage.SearchUsageHolder;
44+
import org.elasticsearch.xcontent.ToXContent;
3945
import org.elasticsearch.xcontent.XContentParser;
4046

4147
import java.io.IOException;
@@ -120,7 +126,24 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
120126

121127
return channel -> {
122128
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
123-
cancelClient.execute(TransportSearchAction.TYPE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel));
129+
cancelClient.execute(TransportSearchAction.TYPE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel) {
130+
@Override
131+
protected Releasable releasableFromResponse(SearchResponse searchResponse) {
132+
return super.releasableFromResponse(searchResponse);
133+
}
134+
135+
@Override
136+
protected void processResponse(SearchResponse searchResponse) throws IOException {
137+
final ReleasableIterator<? extends ToXContent> iter = searchResponse.toXContentChunked(params);
138+
channel.sendResponse(
139+
RestResponse.chunked(
140+
getRestStatus(searchResponse),
141+
ChunkedRestResponseBodyPart.fromXContent(p -> iter, params, channel),
142+
iter
143+
)
144+
);
145+
}
146+
});
124147
};
125148
}
126149

0 commit comments

Comments
 (0)