Skip to content

Commit f17d3d0

Browse files
authored
[8.19] Do not run ES|QL planning and scheduling on transport thread (#133376)
1 parent f60af12 commit f17d3d0

File tree

6 files changed

+57
-3
lines changed

6 files changed

+57
-3
lines changed

docs/changelog/133313.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133313
2+
summary: Do not run on transport thread
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.regex.Regex;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.util.BigArrays;
27+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2728
import org.elasticsearch.common.xcontent.XContentHelper;
2829
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
2930
import org.elasticsearch.compute.data.BlockFactory;
@@ -42,6 +43,7 @@
4243
import org.elasticsearch.search.SearchService;
4344
import org.elasticsearch.tasks.TaskCancelledException;
4445
import org.elasticsearch.test.ESTestCase;
46+
import org.elasticsearch.threadpool.ThreadPool;
4547
import org.elasticsearch.transport.RemoteTransportException;
4648
import org.elasticsearch.transport.TransportService;
4749
import org.elasticsearch.xcontent.XContentType;
@@ -160,7 +162,9 @@
160162
import static org.junit.Assert.assertNotNull;
161163
import static org.junit.Assert.assertNull;
162164
import static org.mockito.ArgumentMatchers.any;
165+
import static org.mockito.ArgumentMatchers.anyString;
163166
import static org.mockito.Mockito.doAnswer;
167+
import static org.mockito.Mockito.doReturn;
164168
import static org.mockito.Mockito.mock;
165169

166170
public final class EsqlTestUtils {
@@ -392,7 +396,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
392396
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));
393397

394398
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
395-
mock(TransportService.class),
399+
createMockTransportService(),
396400
mock(SearchService.class),
397401
null,
398402
mock(ClusterService.class),
@@ -401,6 +405,18 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
401405
mockInferenceRunner()
402406
);
403407

408+
private static TransportService createMockTransportService() {
409+
var service = mock(TransportService.class);
410+
doReturn(createMockThreadPool()).when(service).getThreadPool();
411+
return service;
412+
}
413+
414+
private static ThreadPool createMockThreadPool() {
415+
var threadPool = mock(ThreadPool.class);
416+
doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString());
417+
return threadPool;
418+
}
419+
404420
@SuppressWarnings("unchecked")
405421
private static InferenceRunner mockInferenceRunner() {
406422
InferenceRunner inferenceRunner = mock(InferenceRunner.class);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/premapper/PreMapper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,26 @@
88
package org.elasticsearch.xpack.esql.planner.premapper;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.SubscribableListener;
12+
import org.elasticsearch.threadpool.ThreadPool;
1113
import org.elasticsearch.xpack.esql.expression.function.fulltext.QueryBuilderResolver;
1214
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1315
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
1416

17+
import java.util.concurrent.Executor;
18+
1519
/**
1620
* The class is responsible for invoking any premapping steps that need to be applied to the logical plan,
1721
* before this is being mapped to a physical one.
1822
*/
1923
public class PreMapper {
2024

2125
private final TransportActionServices services;
26+
private final Executor searchExecutor;
2227

2328
public PreMapper(TransportActionServices services) {
2429
this.services = services;
30+
this.searchExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
2531
}
2632

2733
/**
@@ -35,6 +41,9 @@ public void preMapper(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
3541
}
3642

3743
private void queryRewrite(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
38-
QueryBuilderResolver.resolveQueryBuilders(plan, services, listener);
44+
// see https://github.com/elastic/elasticsearch/issues/133312
45+
// ThreadedActionListener might be removed if above issue is resolved
46+
SubscribableListener.<LogicalPlan>newForked(l -> QueryBuilderResolver.resolveQueryBuilders(plan, services, l))
47+
.addListener(listener, searchExecutor, null);
3948
}
4049
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ public void execute(
145145
EsqlExecutionInfo execInfo,
146146
ActionListener<Result> listener
147147
) {
148-
148+
assert ThreadPool.assertCurrentThreadPool(
149+
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
150+
ThreadPool.Names.SYSTEM_READ,
151+
ThreadPool.Names.SEARCH,
152+
ThreadPool.Names.SEARCH_COORDINATION
153+
);
149154
Tuple<PhysicalPlan, PhysicalPlan> coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
150155
physicalPlan,
151156
configuration

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.search.internal.AliasFilter;
3333
import org.elasticsearch.tasks.CancellableTask;
3434
import org.elasticsearch.tasks.TaskCancelledException;
35+
import org.elasticsearch.threadpool.ThreadPool;
3536
import org.elasticsearch.transport.TransportException;
3637
import org.elasticsearch.transport.TransportRequestOptions;
3738
import org.elasticsearch.transport.TransportService;
@@ -123,6 +124,12 @@ abstract class DataNodeRequestSender {
123124
}
124125

125126
final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
127+
assert ThreadPool.assertCurrentThreadPool(
128+
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
129+
ThreadPool.Names.SYSTEM_READ,
130+
ThreadPool.Names.SEARCH,
131+
ThreadPool.Names.SEARCH_COORDINATION
132+
);
126133
final long startTimeInNanos = System.nanoTime();
127134
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
128135
try (

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.logging.LogManager;
3333
import org.elasticsearch.logging.Logger;
3434
import org.elasticsearch.search.SearchShardTarget;
35+
import org.elasticsearch.threadpool.ThreadPool;
3536
import org.elasticsearch.xpack.esql.VerificationException;
3637
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
3738
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
@@ -186,6 +187,11 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
186187
new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
187188
@Override
188189
public void onResponse(LogicalPlan analyzedPlan) {
190+
assert ThreadPool.assertCurrentThreadPool(
191+
ThreadPool.Names.SEARCH,
192+
ThreadPool.Names.SEARCH_COORDINATION,
193+
ThreadPool.Names.SYSTEM_READ
194+
);
189195
preMapper.preMapper(
190196
analyzedPlan,
191197
listener.delegateFailureAndWrap(
@@ -208,6 +214,11 @@ public void executeOptimizedPlan(
208214
LogicalPlan optimizedPlan,
209215
ActionListener<Result> listener
210216
) {
217+
assert ThreadPool.assertCurrentThreadPool(
218+
ThreadPool.Names.SEARCH,
219+
ThreadPool.Names.SEARCH_COORDINATION,
220+
ThreadPool.Names.SYSTEM_READ
221+
);
211222
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
212223
// TODO: this could be snuck into the underlying listener
213224
EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
@@ -360,6 +371,7 @@ public void analyzedPlan(
360371
QueryBuilder requestFilter,
361372
ActionListener<LogicalPlan> logicalPlanListener
362373
) {
374+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
363375
if (parsed.analyzed()) {
364376
logicalPlanListener.onResponse(parsed);
365377
return;

0 commit comments

Comments
 (0)