Skip to content

Commit 4a6c52a

Browse files
Reduce fetch phase heap consumption
This cuts the peak heap consumption for handling large fetch results by up to 50% by avoiding having all results as well as their serialized for on heap at the same time. A follow up can take this to its logical conclusion and pass the network buffer straight down into the actual fetch execution, avoiding the need to even allocate `SearchHits` in the first place or if taken yet another step further, it could even go as far as avoiding the need to allocate the individual hits in full.
1 parent c72d00f commit 4a6c52a

File tree

5 files changed

+81
-22
lines changed

5 files changed

+81
-22
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
487487
(request, channel, task) -> searchService.executeFetchPhase(
488488
request,
489489
(SearchShardTask) task,
490+
TransportService.DIRECT_RESPONSE_PROFILE.equals(channel.getProfileName()) ? null : transportService.newNetworkBytesStream(),
490491
new ChannelActionListener<>(channel)
491492
)
492493
);
@@ -503,7 +504,12 @@ public static void registerRequestHandler(TransportService transportService, Sea
503504
TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new);
504505

505506
final TransportRequestHandler<ShardFetchRequest> shardFetchRequestHandler = (request, channel, task) -> searchService
506-
.executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
507+
.executeFetchPhase(
508+
request,
509+
(SearchShardTask) task,
510+
TransportService.DIRECT_RESPONSE_PROFILE.equals(channel.getProfileName()) ? null : transportService.newNetworkBytesStream(),
511+
new ChannelActionListener<>(channel)
512+
);
507513
transportService.registerRequestHandler(
508514
FETCH_ID_SCROLL_ACTION_NAME,
509515
EsExecutors.DIRECT_EXECUTOR_SERVICE,

server/src/main/java/org/elasticsearch/search/SearchHits.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,9 @@ public boolean isPooled() {
157157
@Override
158158
public void writeTo(StreamOutput out) throws IOException {
159159
assert hasReferences();
160-
final boolean hasTotalHits = totalHits != null;
161-
out.writeBoolean(hasTotalHits);
162-
if (hasTotalHits) {
163-
Lucene.writeTotalHits(out, totalHits);
164-
}
165-
out.writeFloat(maxScore);
160+
writeHeader(out);
166161
out.writeArray(hits);
167-
out.writeOptional(Lucene::writeSortFieldArray, sortFields);
168-
out.writeOptionalString(collapseField);
169-
out.writeOptionalArray(Lucene::writeSortValue, collapseValues);
162+
writeFooter(out);
170163
}
171164

172165
/**
@@ -260,6 +253,37 @@ private void deallocate() {
260253
}
261254
}
262255

256+
void writeAndRelease(StreamOutput out) throws IOException {
257+
boolean released = refCounted.decRef();
258+
assert released;
259+
writeHeader(out);
260+
var hits = this.hits;
261+
out.writeVInt(hits.length);
262+
for (int i = 0; i < hits.length; i++) {
263+
var h = hits[i];
264+
assert h != null;
265+
h.writeTo(out);
266+
h.decRef();
267+
hits[i] = null;
268+
}
269+
writeFooter(out);
270+
}
271+
272+
private void writeFooter(StreamOutput out) throws IOException {
273+
out.writeOptional(Lucene::writeSortFieldArray, sortFields);
274+
out.writeOptionalString(collapseField);
275+
out.writeOptionalArray(Lucene::writeSortValue, collapseValues);
276+
}
277+
278+
private void writeHeader(StreamOutput out) throws IOException {
279+
final boolean hasTotalHits = totalHits != null;
280+
out.writeBoolean(hasTotalHits);
281+
if (hasTotalHits) {
282+
Lucene.writeTotalHits(out, totalHits);
283+
}
284+
out.writeFloat(maxScore);
285+
}
286+
263287
@Override
264288
public boolean hasReferences() {
265289
return refCounted.hasReferences();

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import org.elasticsearch.common.CheckedSupplier;
3737
import org.elasticsearch.common.UUIDs;
3838
import org.elasticsearch.common.breaker.CircuitBreaker;
39+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3940
import org.elasticsearch.common.component.AbstractLifecycleComponent;
41+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
4042
import org.elasticsearch.common.logging.LoggerMessageFormat;
4143
import org.elasticsearch.common.lucene.Lucene;
4244
import org.elasticsearch.common.settings.Setting;
@@ -95,7 +97,6 @@
9597
import org.elasticsearch.search.dfs.DfsPhase;
9698
import org.elasticsearch.search.dfs.DfsSearchResult;
9799
import org.elasticsearch.search.fetch.FetchPhase;
98-
import org.elasticsearch.search.fetch.FetchSearchResult;
99100
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
100101
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
101102
import org.elasticsearch.search.fetch.ShardFetchRequest;
@@ -136,7 +137,9 @@
136137
import org.elasticsearch.threadpool.Scheduler.Cancellable;
137138
import org.elasticsearch.threadpool.ThreadPool;
138139
import org.elasticsearch.threadpool.ThreadPool.Names;
140+
import org.elasticsearch.transport.BytesTransportResponse;
139141
import org.elasticsearch.transport.TransportRequest;
142+
import org.elasticsearch.transport.TransportResponse;
140143
import org.elasticsearch.transport.Transports;
141144

142145
import java.io.IOException;
@@ -1103,7 +1106,8 @@ private Executor getExecutor(IndexShard indexShard) {
11031106
public void executeFetchPhase(
11041107
InternalScrollSearchRequest request,
11051108
SearchShardTask task,
1106-
ActionListener<ScrollQueryFetchSearchResult> listener
1109+
RecyclerBytesStreamOutput networkBuffer,
1110+
ActionListener<TransportResponse> listener
11071111
) {
11081112
final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
11091113
final Releasable markAsUsed;
@@ -1135,8 +1139,14 @@ public void executeFetchPhase(
11351139
opsListener.onFailedQueryPhase(searchContext);
11361140
}
11371141
}
1138-
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime);
1139-
return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
1142+
var resp = executeFetchPhase(readerContext, searchContext, afterQueryTime);
1143+
if (networkBuffer == null) {
1144+
return new ScrollQueryFetchSearchResult(resp, searchContext.shardTarget());
1145+
}
1146+
searchContext.shardTarget().writeTo(networkBuffer);
1147+
resp.writeTo(networkBuffer);
1148+
resp.decRef();
1149+
return new BytesTransportResponse(new ReleasableBytesReference(networkBuffer.bytes(), networkBuffer));
11401150
} catch (Exception e) {
11411151
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
11421152
logger.trace("Fetch phase failed", e);
@@ -1146,7 +1156,12 @@ public void executeFetchPhase(
11461156
}, wrapFailureListener(listener, readerContext, markAsUsed));
11471157
}
11481158

1149-
public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, ActionListener<FetchSearchResult> listener) {
1159+
public void executeFetchPhase(
1160+
ShardFetchRequest request,
1161+
CancellableTask task,
1162+
RecyclerBytesStreamOutput networkBuffer,
1163+
ActionListener<TransportResponse> listener
1164+
) {
11501165
final ReaderContext readerContext = findReaderContext(request.contextId(), request);
11511166
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest());
11521167
final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest));
@@ -1175,8 +1190,14 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
11751190
}
11761191
var fetchResult = searchContext.fetchResult();
11771192
// inc-ref fetch result because we close the SearchContext that references it in this try-with-resources block
1178-
fetchResult.incRef();
1179-
return fetchResult;
1193+
if (networkBuffer == null) {
1194+
fetchResult.incRef();
1195+
return fetchResult;
1196+
}
1197+
fetchResult.contextId.writeTo(networkBuffer);
1198+
fetchResult.consumeHits().writeAndRelease(networkBuffer);
1199+
networkBuffer.writeOptionalWriteable(fetchResult.profileResult());
1200+
return new BytesTransportResponse(new ReleasableBytesReference(networkBuffer.bytes(), networkBuffer));
11801201
} catch (Exception e) {
11811202
assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
11821203
// we handle the failure in the failure listener below

server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ public SearchHits hits() {
8383
return hits;
8484
}
8585

86+
public SearchHits consumeHits() {
87+
var hits = this.hits;
88+
this.hits = null;
89+
return hits;
90+
}
91+
8692
public FetchSearchResult initCounter() {
8793
counter = 0;
8894
return this;

server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@
129129
import org.elasticsearch.test.ESSingleNodeTestCase;
130130
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
131131
import org.elasticsearch.threadpool.ThreadPool;
132+
import org.elasticsearch.transport.TransportResponse;
132133
import org.elasticsearch.xcontent.XContentBuilder;
133134
import org.elasticsearch.xcontent.json.JsonXContent;
134135
import org.junit.Before;
@@ -412,8 +413,8 @@ public void testSearchWhileIndexDeleted() throws InterruptedException {
412413
intCursors,
413414
null/* not a scroll */
414415
);
415-
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
416-
service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, emptyMap()), listener);
416+
PlainActionFuture<TransportResponse> listener = new PlainActionFuture<>();
417+
service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, emptyMap()), null, listener);
417418
listener.get();
418419
if (useScroll) {
419420
// have to free context since this test does not remove the index from IndicesService.
@@ -601,9 +602,10 @@ public RankShardResult buildRankFeatureShardResult(SearchHits hits, int shardId)
601602
// execute fetch phase and perform any validations once we retrieve the response
602603
// the difference in how we do assertions here is needed because once the transport service sends back the response
603604
// it decrements the reference to the FetchSearchResult (through the ActionListener#respondAndRelease) and sets hits to null
604-
PlainActionFuture<FetchSearchResult> fetchListener = new PlainActionFuture<>() {
605+
PlainActionFuture<TransportResponse> fetchListener = new PlainActionFuture<>() {
605606
@Override
606-
public void onResponse(FetchSearchResult fetchSearchResult) {
607+
public void onResponse(TransportResponse response) {
608+
FetchSearchResult fetchSearchResult = (FetchSearchResult) response;
607609
assertNotNull(fetchSearchResult);
608610
assertNotNull(fetchSearchResult.hits());
609611

@@ -624,7 +626,7 @@ public void onFailure(Exception e) {
624626
throw new AssertionError("No failure should have been raised", e);
625627
}
626628
};
627-
service.executeFetchPhase(fetchRequest, searchTask, fetchListener);
629+
service.executeFetchPhase(fetchRequest, searchTask, null, fetchListener);
628630
fetchListener.get();
629631
} catch (Exception ex) {
630632
if (queryResult != null) {

0 commit comments

Comments
 (0)