Skip to content

Commit 326248d

Browse files
committed
DO not run ES|QL query on transport thread
1 parent b3c95cd commit 326248d

File tree

3 files changed

+28
-11
lines changed

3 files changed

+28
-11
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.regex.Regex;
2626
import org.elasticsearch.common.settings.Settings;
2727
import org.elasticsearch.common.util.BigArrays;
28+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2829
import org.elasticsearch.common.xcontent.XContentHelper;
2930
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
3031
import org.elasticsearch.compute.data.BlockFactory;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.search.SearchService;
4445
import org.elasticsearch.tasks.TaskCancelledException;
4546
import org.elasticsearch.test.ESTestCase;
47+
import org.elasticsearch.threadpool.ThreadPool;
4648
import org.elasticsearch.transport.RemoteTransportException;
4749
import org.elasticsearch.transport.TransportService;
4850
import org.elasticsearch.xcontent.XContentType;
@@ -161,6 +163,8 @@
161163
import static org.hamcrest.Matchers.instanceOf;
162164
import static org.junit.Assert.assertNotNull;
163165
import static org.junit.Assert.assertNull;
166+
import static org.mockito.ArgumentMatchers.anyString;
167+
import static org.mockito.Mockito.doReturn;
164168
import static org.mockito.Mockito.mock;
165169

166170
public final class EsqlTestUtils {
@@ -413,7 +417,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
413417
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
414418

415419
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
416-
mock(TransportService.class),
420+
createMockTransportService(),
417421
mock(SearchService.class),
418422
null,
419423
mock(ClusterService.class),
@@ -423,6 +427,18 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
423427
new InferenceService(mock(Client.class))
424428
);
425429

430+
private static TransportService createMockTransportService() {
431+
var service = mock(TransportService.class);
432+
doReturn(createMockThreadPool()).when(service).getThreadPool();
433+
return service;
434+
}
435+
436+
private static ThreadPool createMockThreadPool() {
437+
var threadPool = mock(ThreadPool.class);
438+
doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString());
439+
return threadPool;
440+
}
441+
426442
private EsqlTestUtils() {}
427443

428444
public static Configuration configuration(QueryPragmas pragmas, String query) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/premapper/PreMapper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,26 @@
88
package org.elasticsearch.xpack.esql.planner.premapper;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.ThreadedActionListener;
12+
import org.elasticsearch.threadpool.ThreadPool;
1113
import org.elasticsearch.xpack.esql.expression.function.fulltext.QueryBuilderResolver;
1214
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1315
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
1416

17+
import java.util.concurrent.Executor;
18+
1519
/**
1620
* The class is responsible for invoking any premapping steps that need to be applied to the logical plan,
1721
* before this is being mapped to a physical one.
1822
*/
1923
public class PreMapper {
2024

2125
private final TransportActionServices services;
26+
private final Executor searchExecutor;
2227

2328
public PreMapper(TransportActionServices services) {
2429
this.services = services;
30+
this.searchExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
2531
}
2632

2733
/**
@@ -35,6 +41,8 @@ public void preMapper(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
3541
}
3642

3743
private void queryRewrite(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
38-
QueryBuilderResolver.resolveQueryBuilders(plan, services, listener);
44+
// see https://github.com/elastic/elasticsearch/issues/133312
45+
// ThreadedActionListener might be removed if above issue is resolved
46+
QueryBuilderResolver.resolveQueryBuilders(plan, services, new ThreadedActionListener<>(searchExecutor, listener));
3947
}
4048
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.threadpool.ThreadPool;
3535
import org.elasticsearch.transport.RemoteClusterAware;
3636
import org.elasticsearch.transport.RemoteClusterService;
37-
import org.elasticsearch.transport.TcpTransport;
3837
import org.elasticsearch.xpack.esql.VerificationException;
3938
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4039
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
@@ -75,7 +74,6 @@
7574
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
7675
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
7776
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
78-
import org.elasticsearch.xpack.ml.MachineLearning;
7977

8078
import java.util.ArrayList;
8179
import java.util.Collection;
@@ -179,6 +177,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
179177
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
180178
@Override
181179
public void onResponse(LogicalPlan analyzedPlan) {
180+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION);
182181
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
183182
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
184183
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
@@ -198,13 +197,7 @@ public void executeOptimizedPlan(
198197
LogicalPlan optimizedPlan,
199198
ActionListener<Result> listener
200199
) {
201-
assert ThreadPool.assertCurrentThreadPool(
202-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
203-
ThreadPool.Names.SYSTEM_READ,
204-
ThreadPool.Names.SEARCH,
205-
ThreadPool.Names.SEARCH_COORDINATION,
206-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
207-
);
200+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION);
208201
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
209202
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
210203
String physicalPlanString = physicalPlan.toString();

0 commit comments

Comments
 (0)