Skip to content

Commit 782319c

Browse files
idegtiarenkopabloem
authored andcommitted
Do not run ES|QL planning and scheduling on transport thread (elastic#133313)
1 parent f41392e commit 782319c

File tree

6 files changed

+40
-17
lines changed

6 files changed

+40
-17
lines changed

docs/changelog/133313.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133313
2+
summary: Do not run on transport thread
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.regex.Regex;
2626
import org.elasticsearch.common.settings.Settings;
2727
import org.elasticsearch.common.util.BigArrays;
28+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2829
import org.elasticsearch.common.xcontent.XContentHelper;
2930
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
3031
import org.elasticsearch.compute.data.BlockFactory;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.search.SearchService;
4445
import org.elasticsearch.tasks.TaskCancelledException;
4546
import org.elasticsearch.test.ESTestCase;
47+
import org.elasticsearch.threadpool.ThreadPool;
4648
import org.elasticsearch.transport.RemoteTransportException;
4749
import org.elasticsearch.transport.TransportService;
4850
import org.elasticsearch.xcontent.XContentType;
@@ -161,6 +163,8 @@
161163
import static org.hamcrest.Matchers.instanceOf;
162164
import static org.junit.Assert.assertNotNull;
163165
import static org.junit.Assert.assertNull;
166+
import static org.mockito.ArgumentMatchers.anyString;
167+
import static org.mockito.Mockito.doReturn;
164168
import static org.mockito.Mockito.mock;
165169

166170
public final class EsqlTestUtils {
@@ -413,7 +417,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
413417
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
414418

415419
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
416-
mock(TransportService.class),
420+
createMockTransportService(),
417421
mock(SearchService.class),
418422
null,
419423
mock(ClusterService.class),
@@ -423,6 +427,18 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
423427
new InferenceService(mock(Client.class))
424428
);
425429

430+
private static TransportService createMockTransportService() {
431+
var service = mock(TransportService.class);
432+
doReturn(createMockThreadPool()).when(service).getThreadPool();
433+
return service;
434+
}
435+
436+
private static ThreadPool createMockThreadPool() {
437+
var threadPool = mock(ThreadPool.class);
438+
doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString());
439+
return threadPool;
440+
}
441+
426442
private EsqlTestUtils() {}
427443

428444
public static Configuration configuration(QueryPragmas pragmas, String query) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/premapper/PreMapper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,26 @@
88
package org.elasticsearch.xpack.esql.planner.premapper;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.SubscribableListener;
12+
import org.elasticsearch.threadpool.ThreadPool;
1113
import org.elasticsearch.xpack.esql.expression.function.fulltext.QueryBuilderResolver;
1214
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1315
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
1416

17+
import java.util.concurrent.Executor;
18+
1519
/**
1620
* The class is responsible for invoking any premapping steps that need to be applied to the logical plan,
1721
* before this is being mapped to a physical one.
1822
*/
1923
public class PreMapper {
2024

2125
private final TransportActionServices services;
26+
private final Executor searchExecutor;
2227

2328
public PreMapper(TransportActionServices services) {
2429
this.services = services;
30+
this.searchExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
2531
}
2632

2733
/**
@@ -35,6 +41,9 @@ public void preMapper(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
3541
}
3642

3743
private void queryRewrite(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
38-
QueryBuilderResolver.resolveQueryBuilders(plan, services, listener);
44+
// see https://github.com/elastic/elasticsearch/issues/133312
45+
// ThreadedActionListener might be removed if above issue is resolved
46+
SubscribableListener.<LogicalPlan>newForked(l -> QueryBuilderResolver.resolveQueryBuilders(plan, services, l))
47+
.addListener(listener, searchExecutor, null);
3948
}
4049
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.threadpool.ThreadPool;
4545
import org.elasticsearch.transport.AbstractTransportRequest;
4646
import org.elasticsearch.transport.RemoteClusterAware;
47-
import org.elasticsearch.transport.TcpTransport;
4847
import org.elasticsearch.transport.TransportException;
4948
import org.elasticsearch.transport.TransportService;
5049
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
@@ -65,7 +64,6 @@
6564
import org.elasticsearch.xpack.esql.session.Configuration;
6665
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
6766
import org.elasticsearch.xpack.esql.session.Result;
68-
import org.elasticsearch.xpack.ml.MachineLearning;
6967

7068
import java.util.ArrayList;
7169
import java.util.Collections;
@@ -194,11 +192,9 @@ public void execute(
194192
) {
195193
assert ThreadPool.assertCurrentThreadPool(
196194
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
197-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
198195
ThreadPool.Names.SYSTEM_READ,
199196
ThreadPool.Names.SEARCH,
200-
ThreadPool.Names.SEARCH_COORDINATION,
201-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
197+
ThreadPool.Names.SEARCH_COORDINATION
202198
);
203199
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);
204200

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import org.elasticsearch.tasks.CancellableTask;
3535
import org.elasticsearch.tasks.TaskCancelledException;
3636
import org.elasticsearch.threadpool.ThreadPool;
37-
import org.elasticsearch.transport.TcpTransport;
3837
import org.elasticsearch.transport.TransportException;
3938
import org.elasticsearch.transport.TransportRequestOptions;
4039
import org.elasticsearch.transport.TransportService;
4140
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
42-
import org.elasticsearch.xpack.ml.MachineLearning;
4341

4442
import java.util.ArrayList;
4543
import java.util.Collections;
@@ -133,11 +131,9 @@ abstract class DataNodeRequestSender {
133131
final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
134132
assert ThreadPool.assertCurrentThreadPool(
135133
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
136-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
137134
ThreadPool.Names.SYSTEM_READ,
138135
ThreadPool.Names.SEARCH,
139-
ThreadPool.Names.SEARCH_COORDINATION,
140-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
136+
ThreadPool.Names.SEARCH_COORDINATION
141137
);
142138
final long startTimeInNanos = System.nanoTime();
143139
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.threadpool.ThreadPool;
3535
import org.elasticsearch.transport.RemoteClusterAware;
3636
import org.elasticsearch.transport.RemoteClusterService;
37-
import org.elasticsearch.transport.TcpTransport;
3837
import org.elasticsearch.xpack.esql.VerificationException;
3938
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4039
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
@@ -75,7 +74,6 @@
7574
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
7675
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
7776
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
78-
import org.elasticsearch.xpack.ml.MachineLearning;
7977

8078
import java.util.ArrayList;
8179
import java.util.Collection;
@@ -179,6 +177,11 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
179177
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
180178
@Override
181179
public void onResponse(LogicalPlan analyzedPlan) {
180+
assert ThreadPool.assertCurrentThreadPool(
181+
ThreadPool.Names.SEARCH,
182+
ThreadPool.Names.SEARCH_COORDINATION,
183+
ThreadPool.Names.SYSTEM_READ
184+
);
182185
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
183186
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
184187
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
@@ -199,11 +202,9 @@ public void executeOptimizedPlan(
199202
ActionListener<Result> listener
200203
) {
201204
assert ThreadPool.assertCurrentThreadPool(
202-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
203-
ThreadPool.Names.SYSTEM_READ,
204205
ThreadPool.Names.SEARCH,
205206
ThreadPool.Names.SEARCH_COORDINATION,
206-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
207+
ThreadPool.Names.SYSTEM_READ
207208
);
208209
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
209210
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);

0 commit comments

Comments
 (0)