Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133313.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133313
summary: Do not run on transport thread
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.BlockFactory;
Expand All @@ -42,6 +43,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -160,7 +162,9 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public final class EsqlTestUtils {
Expand Down Expand Up @@ -392,7 +396,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
createMockTransportService(),
mock(SearchService.class),
null,
mock(ClusterService.class),
Expand All @@ -401,6 +405,18 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
mockInferenceRunner()
);

private static TransportService createMockTransportService() {
var service = mock(TransportService.class);
doReturn(createMockThreadPool()).when(service).getThreadPool();
return service;
}

private static ThreadPool createMockThreadPool() {
var threadPool = mock(ThreadPool.class);
doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString());
return threadPool;
}

@SuppressWarnings("unchecked")
private static InferenceRunner mockInferenceRunner() {
InferenceRunner inferenceRunner = mock(InferenceRunner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@
package org.elasticsearch.xpack.esql.planner.premapper;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.expression.function.fulltext.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

import java.util.concurrent.Executor;

/**
* The class is responsible for invoking any premapping steps that need to be applied to the logical plan,
* before this is being mapped to a physical one.
*/
public class PreMapper {

private final TransportActionServices services;
private final Executor searchExecutor;

public PreMapper(TransportActionServices services) {
this.services = services;
this.searchExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
}

/**
Expand All @@ -35,6 +41,9 @@ public void preMapper(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
}

private void queryRewrite(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
QueryBuilderResolver.resolveQueryBuilders(plan, services, listener);
// see https://github.com/elastic/elasticsearch/issues/133312
// ThreadedActionListener might be removed if above issue is resolved
SubscribableListener.<LogicalPlan>newForked(l -> QueryBuilderResolver.resolveQueryBuilders(plan, services, l))
.addListener(listener, searchExecutor, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ public void execute(
EsqlExecutionInfo execInfo,
ActionListener<Result> listener
) {

assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
Tuple<PhysicalPlan, PhysicalPlan> coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
physicalPlan,
configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -123,6 +124,12 @@ abstract class DataNodeRequestSender {
}

final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
Expand Down Expand Up @@ -186,6 +187,11 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
preMapper.preMapper(
analyzedPlan,
listener.delegateFailureAndWrap(
Expand All @@ -208,6 +214,11 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
Expand Down Expand Up @@ -360,6 +371,7 @@ public void analyzedPlan(
QueryBuilder requestFilter,
ActionListener<LogicalPlan> logicalPlanListener
) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
if (parsed.analyzed()) {
logicalPlanListener.onResponse(parsed);
return;
Expand Down