Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public void execute(
EsqlExecutionInfo execInfo,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

List<PhysicalPlan> subplans = subplansAndMainPlan.v1();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -128,6 +129,12 @@ abstract class DataNodeRequestSender {
}

final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
try (
Expand Down Expand Up @@ -184,6 +191,7 @@ private static int nodeOrder(DiscoveryNode node) {
}

private void trySendingRequestsForPendingShards(TargetShards targetShards, ComputeListener computeListener) {
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH);
changed.set(true);
final ActionListener<Void> listener = computeListener.acquireAvoid();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.esql.VerificationException;
Expand Down Expand Up @@ -201,6 +202,7 @@ public String sessionId() {
* Execute an ESQL request.
*/
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
assert executionInfo != null : "Null EsqlExecutionInfo";
LOGGER.debug("ESQL query:\n{}", request.query());
LogicalPlan parsed = parse(request.query(), request.params());
Expand Down Expand Up @@ -231,6 +233,11 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SYSTEM_READ,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised we have so many places where we could end up on SYSTEM_READ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I am even more concerned about TRANSPORT_WORKER_THREAD_NAME_PREFIX. We should take a look what path lead to us executing on a transport thread.

ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION
);
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
Expand Down