Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand Down Expand Up @@ -190,6 +191,13 @@ public void execute(
EsqlExecutionInfo execInfo,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

List<PhysicalPlan> subplans = subplansAndMainPlan.v1();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -128,6 +130,13 @@ abstract class DataNodeRequestSender {
}

final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
try (
Expand Down Expand Up @@ -184,6 +193,7 @@ private static int nodeOrder(DiscoveryNode node) {
}

private void trySendingRequestsForPendingShards(TargetShards targetShards, ComputeListener computeListener) {
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH);
changed.set(true);
final ActionListener<Void> listener = computeListener.acquireAvoid();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
Expand Down Expand Up @@ -167,6 +169,7 @@ public String sessionId() {
* Execute an ESQL request.
*/
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
assert executionInfo != null : "Null EsqlExecutionInfo";
LOGGER.debug("ESQL query:\n{}", request.query());
LogicalPlan parsed = parse(request.query(), request.params());
Expand Down Expand Up @@ -197,6 +200,12 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised we have so many places where we could end up on SYSTEM_READ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I am even more concerned about TRANSPORT_WORKER_THREAD_NAME_PREFIX. We should take a look what path lead to us executing on a transport thread.

ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {

private TestThreadPool threadPool;
private Executor executor = null;
private static final String ESQL_TEST_EXECUTOR = "esql_test_executor";

private final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node-1").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
private final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
Expand All @@ -95,9 +94,16 @@ public void setThreadPool() {
int numThreads = randomBoolean() ? 1 : between(2, 16);
threadPool = new TestThreadPool(
"test",
new FixedExecutorBuilder(Settings.EMPTY, ESQL_TEST_EXECUTOR, numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT)
new FixedExecutorBuilder(
Settings.EMPTY,
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
numThreads,
1024,
"esql",
EsExecutors.TaskTrackingConfig.DEFAULT
)
);
executor = threadPool.executor(ESQL_TEST_EXECUTOR);
executor = threadPool.executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME);
}

@After
Expand Down