-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Introduce BytesTransportResponse to enable lower heap overhead network responses #127112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,9 @@ | |
| import org.elasticsearch.action.support.IndicesOptions; | ||
| import org.elasticsearch.client.internal.Client; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
| import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
|
|
@@ -50,6 +52,7 @@ | |
| import org.elasticsearch.tasks.TaskId; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.AbstractTransportRequest; | ||
| import org.elasticsearch.transport.BytesTransportResponse; | ||
| import org.elasticsearch.transport.LeakTracker; | ||
| import org.elasticsearch.transport.SendRequestTransportException; | ||
| import org.elasticsearch.transport.Transport; | ||
|
|
@@ -58,6 +61,7 @@ | |
| import org.elasticsearch.transport.TransportException; | ||
| import org.elasticsearch.transport.TransportResponse; | ||
| import org.elasticsearch.transport.TransportResponseHandler; | ||
| import org.elasticsearch.transport.TransportService; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
|
|
@@ -215,22 +219,6 @@ public static final class NodeQueryResponse extends TransportResponse { | |
| this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); | ||
| } | ||
|
|
||
| NodeQueryResponse( | ||
| QueryPhaseResultConsumer.MergeResult mergeResult, | ||
| Object[] results, | ||
| SearchPhaseController.TopDocsStats topDocsStats | ||
| ) { | ||
| this.results = results; | ||
| for (Object result : results) { | ||
| if (result instanceof QuerySearchResult r) { | ||
| r.incRef(); | ||
| } | ||
| } | ||
| this.mergeResult = mergeResult; | ||
| this.topDocsStats = topDocsStats; | ||
| assert Arrays.stream(results).noneMatch(Objects::isNull) : Arrays.toString(results); | ||
| } | ||
|
|
||
| // public for tests | ||
| public Object[] getResults() { | ||
| return results; | ||
|
|
@@ -552,7 +540,7 @@ static void registerNodeSearchAction( | |
| ) { | ||
| var transportService = searchTransportService.transportService(); | ||
| var threadPool = transportService.getThreadPool(); | ||
| final Dependencies dependencies = new Dependencies(searchService, threadPool.executor(ThreadPool.Names.SEARCH)); | ||
| final Dependencies dependencies = new Dependencies(searchService, transportService, threadPool.executor(ThreadPool.Names.SEARCH)); | ||
| // Even though not all searches run on the search pool, we use the search pool size as the upper limit of shards to execute in | ||
| // parallel to keep the implementation simple instead of working out the exact pool(s) a query will use up-front. | ||
| final int searchPoolMax = threadPool.info(ThreadPool.Names.SEARCH).getMax(); | ||
|
|
@@ -715,7 +703,7 @@ public void onFailure(Exception e) { | |
| } | ||
| } | ||
|
|
||
| private record Dependencies(SearchService searchService, Executor executor) {} | ||
| private record Dependencies(SearchService searchService, TransportService transportService, Executor executor) {} | ||
|
|
||
| private static final class QueryPerNodeState { | ||
|
|
||
|
|
@@ -760,6 +748,8 @@ void onShardDone() { | |
| if (countDown.countDown() == false) { | ||
| return; | ||
| } | ||
| RecyclerBytesStreamOutput out = null; | ||
| boolean success = false; | ||
| var channelListener = new ChannelActionListener<>(channel); | ||
| try (queryPhaseResultConsumer) { | ||
| var failure = queryPhaseResultConsumer.failure.get(); | ||
|
|
@@ -788,34 +778,46 @@ void onShardDone() { | |
| relevantShardIndices.set(localIndex); | ||
| } | ||
| } | ||
| final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()]; | ||
| for (int i = 0; i < results.length; i++) { | ||
| var result = queryPhaseResultConsumer.results.get(i); | ||
| if (result == null) { | ||
| results[i] = failures.get(i); | ||
| } else { | ||
| // free context id and remove it from the result right away in case we don't need it anymore | ||
| if (result instanceof QuerySearchResult q | ||
| && q.getContextId() != null | ||
| && relevantShardIndices.get(q.getShardIndex()) == false | ||
| && q.hasSuggestHits() == false | ||
| && q.getRankShardResult() == null | ||
| && searchRequest.searchRequest.scroll() == null | ||
| && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) { | ||
| if (dependencies.searchService.freeReaderContext(q.getContextId())) { | ||
| q.clearContextId(); | ||
| final int resultCount = queryPhaseResultConsumer.getNumShards(); | ||
| out = dependencies.transportService.newNetworkBytesStream(); | ||
| out.setTransportVersion(channel.getVersion()); | ||
| try { | ||
| out.writeVInt(resultCount); | ||
| for (int i = 0; i < resultCount; i++) { | ||
| var result = queryPhaseResultConsumer.results.get(i); | ||
| if (result == null) { | ||
| out.writeBoolean(false); | ||
| out.writeException(failures.remove(i)); | ||
|
||
| } else { | ||
| // free context id and remove it from the result right away in case we don't need it anymore | ||
| if (result instanceof QuerySearchResult q | ||
| && q.getContextId() != null | ||
| && relevantShardIndices.get(q.getShardIndex()) == false | ||
| && q.hasSuggestHits() == false | ||
| && q.getRankShardResult() == null | ||
| && searchRequest.searchRequest.scroll() == null | ||
| && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) { | ||
| if (dependencies.searchService.freeReaderContext(q.getContextId())) { | ||
| q.clearContextId(); | ||
| } | ||
| } | ||
| out.writeBoolean(true); | ||
| result.writeTo(out); | ||
| } | ||
| results[i] = result; | ||
| } | ||
| assert results[i] != null; | ||
| mergeResult.writeTo(out); | ||
| queryPhaseResultConsumer.topDocsStats.writeTo(out); | ||
| success = true; | ||
| } catch (IOException e) { | ||
| handleMergeFailure(e, channelListener); | ||
| return; | ||
| } | ||
| } finally { | ||
| if (success == false && out != null) { | ||
| out.close(); | ||
| } | ||
|
|
||
| ActionListener.respondAndRelease( | ||
| channelListener, | ||
| new NodeQueryResponse(mergeResult, results, queryPhaseResultConsumer.topDocsStats) | ||
| ); | ||
| } | ||
| ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out))); | ||
| } | ||
|
|
||
| private void handleMergeFailure(Exception e, ChannelActionListener<TransportResponse> channelListener) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.transport; | ||
|
|
||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| public interface BytesTransportMessage { | ||
|
|
||
| ReleasableBytesReference bytes(); | ||
|
|
||
| /** | ||
| * Writes the data in a "thin" manner, without the actual bytes, assumes | ||
| * the actual bytes will be appended right after this content. | ||
| */ | ||
| void writeThin(StreamOutput out) throws IOException; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.transport; | ||
|
|
||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * A specialized, bytes only response, that can potentially be optimized on the network layer. | ||
| */ | ||
| public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage { | ||
|
|
||
| private final ReleasableBytesReference bytes; | ||
|
|
||
| public BytesTransportResponse(ReleasableBytesReference bytes) { | ||
| this.bytes = bytes; | ||
| } | ||
|
|
||
| @Override | ||
| public ReleasableBytesReference bytes() { | ||
| return this.bytes; | ||
| } | ||
|
|
||
| @Override | ||
| public void writeThin(StreamOutput out) throws IOException {} | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| bytes.writeTo(out); | ||
| } | ||
|
|
||
| @Override | ||
| public void incRef() { | ||
| bytes.incRef(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean tryIncRef() { | ||
| return bytes.tryIncRef(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean decRef() { | ||
| return bytes.decRef(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasReferences() { | ||
| return bytes.hasReferences(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we need to duplicate the
writeTo()on the message for the time being because of CCS proxy request BwC concerns urgh :) but I think it's not too bad and we can dry this up in a follow-up. This change is about networking, not search really so I didn't want to mess with that here.