Skip to content

Commit b101dcc

Browse files
Stop passing search query local node fanout through transport layer
We don't need to go through all the steps in the transport layer for executing query work on the local node. Doing so adds considerable overhead. E.g. with security enabled, the amount of work on transport threads goes up by about 4x for local only searches in the `geonames` track. The simpler a query is to rewrite and can-match and the more shards are queried on the local node, the more pronounced the overhead is in practice.
1 parent b619e14 commit b101dcc

File tree

4 files changed

+27
-5
lines changed

4 files changed

+27
-5
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1818
import org.elasticsearch.search.SearchPhaseResult;
19+
import org.elasticsearch.search.SearchService;
1920
import org.elasticsearch.search.SearchShardTarget;
2021
import org.elasticsearch.search.dfs.AggregatedDfs;
2122
import org.elasticsearch.search.internal.AliasFilter;
@@ -40,6 +41,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
4041
private final int trackTotalHitsUpTo;
4142
private volatile BottomSortValuesCollector bottomSortCollector;
4243
private final Client client;
44+
private final SearchService searchService;
4345

4446
SearchQueryThenFetchAsyncAction(
4547
Logger logger,
@@ -57,7 +59,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
5759
ClusterState clusterState,
5860
SearchTask task,
5961
SearchResponse.Clusters clusters,
60-
Client client
62+
Client client,
63+
SearchService searchService
6164
) {
6265
super(
6366
"query",
@@ -82,7 +85,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
8285
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
8386
this.progressListener = task.getProgressListener();
8487
this.client = client;
85-
88+
this.searchService = searchService;
8689
// don't build the SearchShard list (can be expensive) if the SearchProgressListener won't use it
8790
if (progressListener != SearchProgressListener.NOOP) {
8891
notifyListShards(progressListener, clusters, request.source());
@@ -95,7 +98,13 @@ protected void executePhaseOnShard(
9598
final SearchActionListener<SearchPhaseResult> listener
9699
) {
97100
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));
98-
getSearchTransport().sendExecuteQuery(connection, request, getTask(), listener);
101+
var searchTransport = getSearchTransport();
102+
var task = getTask();
103+
if (searchTransport.transportService().getLocalNodeConnection() == connection) {
104+
searchService.executeQueryPhase(request, task, listener);
105+
return;
106+
}
107+
searchTransport.sendExecuteQuery(connection, request, task, listener);
99108
}
100109

101110
@Override

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ public SearchTransportService(
122122
this.responseWrapper = responseWrapper;
123123
}
124124

125+
public TransportService transportService() {
126+
return transportService;
127+
}
128+
125129
public void sendFreeContext(
126130
Transport.Connection connection,
127131
ShardSearchContextId contextId,

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1565,7 +1565,8 @@ public void runNewSearchPhase(
15651565
clusterState,
15661566
task,
15671567
clusters,
1568-
client
1568+
client,
1569+
searchService
15691570
);
15701571
}
15711572
success = true;

server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.test.ESTestCase;
4040
import org.elasticsearch.test.InternalAggregationTestCase;
4141
import org.elasticsearch.transport.Transport;
42+
import org.elasticsearch.transport.TransportService;
4243

4344
import java.util.Collections;
4445
import java.util.List;
@@ -51,6 +52,8 @@
5152
import static org.hamcrest.Matchers.equalTo;
5253
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5354
import static org.hamcrest.Matchers.instanceOf;
55+
import static org.mockito.Mockito.mock;
56+
import static org.mockito.Mockito.when;
5457

5558
public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
5659
public void testBottomFieldSort() throws Exception {
@@ -83,7 +86,11 @@ private void testCase(boolean withScroll, boolean withCollapse) throws Exception
8386
AtomicInteger numWithTopDocs = new AtomicInteger();
8487
AtomicInteger successfulOps = new AtomicInteger();
8588
AtomicBoolean canReturnNullResponse = new AtomicBoolean(false);
86-
SearchTransportService searchTransportService = new SearchTransportService(null, null, null) {
89+
TransportService transportService = mock(TransportService.class);
90+
when(transportService.getLocalNodeConnection()).thenReturn(
91+
new SearchAsyncActionTests.MockConnection(DiscoveryNodeUtils.create("local"))
92+
);
93+
SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) {
8794
@Override
8895
public void sendExecuteQuery(
8996
Transport.Connection connection,
@@ -201,6 +208,7 @@ public void sendExecuteQuery(
201208
new ClusterState.Builder(new ClusterName("test")).build(),
202209
task,
203210
SearchResponse.Clusters.EMPTY,
211+
null,
204212
null
205213
) {
206214
@Override

0 commit comments

Comments
 (0)