Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133313.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133313
summary: Do not run on transport thread
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.BlockFactory;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -161,6 +163,8 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public final class EsqlTestUtils {
Expand Down Expand Up @@ -413,7 +417,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
createMockTransportService(),
mock(SearchService.class),
null,
mock(ClusterService.class),
Expand All @@ -423,6 +427,18 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
new InferenceService(mock(Client.class))
);

private static TransportService createMockTransportService() {
var service = mock(TransportService.class);
doReturn(createMockThreadPool()).when(service).getThreadPool();
return service;
}

private static ThreadPool createMockThreadPool() {
var threadPool = mock(ThreadPool.class);
doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString());
return threadPool;
}

private EsqlTestUtils() {}

public static Configuration configuration(QueryPragmas pragmas, String query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@
package org.elasticsearch.xpack.esql.planner.premapper;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.esql.expression.function.fulltext.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

import java.util.concurrent.Executor;

/**
* The class is responsible for invoking any premapping steps that need to be applied to the logical plan,
* before this is being mapped to a physical one.
*/
public class PreMapper {

private final TransportActionServices services;
private final Executor searchExecutor;

public PreMapper(TransportActionServices services) {
this.services = services;
this.searchExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
}

/**
Expand All @@ -35,6 +41,8 @@ public void preMapper(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
}

private void queryRewrite(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
QueryBuilderResolver.resolveQueryBuilders(plan, services, listener);
// see https://github.com/elastic/elasticsearch/issues/133312
// ThreadedActionListener might be removed if above issue is resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use SubscribableListener instead of ThreadedActionListener so that we only fork after going async?

        SubscribableListener<LogicalPlan> sub = new SubscribableListener<>();
        QueryBuilderResolver.resolveQueryBuilders(plan, services, sub);
        sub.addListener(listener, searchExecutor, null);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try this

QueryBuilderResolver.resolveQueryBuilders(plan, services, new ThreadedActionListener<>(searchExecutor, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand All @@ -65,7 +64,6 @@
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -194,11 +192,9 @@ public void execute(
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
ThreadPool.Names.SEARCH_COORDINATION
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

);
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -133,11 +131,9 @@ abstract class DataNodeRequestSender {
final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
ThreadPool.Names.SEARCH_COORDINATION
);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
Expand Down Expand Up @@ -75,7 +74,6 @@
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -179,6 +177,11 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
Expand All @@ -199,11 +202,9 @@ public void executeOptimizedPlan(
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
ThreadPool.Names.SYSTEM_READ
);
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
Expand Down