Skip to content

Commit 537baa7

Browse files
idegtiarenkoafoucret
authored andcommitted
ES|QL Assert current thread during query planning and execution (#131807)
1 parent a5483f5 commit 537baa7

File tree

4 files changed

+36
-3
lines changed

4 files changed

+36
-3
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.threadpool.ThreadPool;
4545
import org.elasticsearch.transport.AbstractTransportRequest;
4646
import org.elasticsearch.transport.RemoteClusterAware;
47+
import org.elasticsearch.transport.TcpTransport;
4748
import org.elasticsearch.transport.TransportException;
4849
import org.elasticsearch.transport.TransportService;
4950
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
@@ -190,6 +191,13 @@ public void execute(
190191
EsqlExecutionInfo execInfo,
191192
ActionListener<Result> listener
192193
) {
194+
assert ThreadPool.assertCurrentThreadPool(
195+
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
196+
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
197+
ThreadPool.Names.SYSTEM_READ,
198+
ThreadPool.Names.SEARCH,
199+
ThreadPool.Names.SEARCH_COORDINATION
200+
);
193201
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);
194202

195203
List<PhysicalPlan> subplans = subplansAndMainPlan.v1();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.elasticsearch.search.internal.AliasFilter;
3434
import org.elasticsearch.tasks.CancellableTask;
3535
import org.elasticsearch.tasks.TaskCancelledException;
36+
import org.elasticsearch.threadpool.ThreadPool;
37+
import org.elasticsearch.transport.TcpTransport;
3638
import org.elasticsearch.transport.TransportException;
3739
import org.elasticsearch.transport.TransportRequestOptions;
3840
import org.elasticsearch.transport.TransportService;
@@ -128,6 +130,13 @@ abstract class DataNodeRequestSender {
128130
}
129131

130132
final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
133+
assert ThreadPool.assertCurrentThreadPool(
134+
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
135+
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
136+
ThreadPool.Names.SYSTEM_READ,
137+
ThreadPool.Names.SEARCH,
138+
ThreadPool.Names.SEARCH_COORDINATION
139+
);
131140
final long startTimeInNanos = System.nanoTime();
132141
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
133142
try (
@@ -184,6 +193,7 @@ private static int nodeOrder(DiscoveryNode node) {
184193
}
185194

186195
private void trySendingRequestsForPendingShards(TargetShards targetShards, ComputeListener computeListener) {
196+
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH);
187197
changed.set(true);
188198
final ActionListener<Void> listener = computeListener.acquireAvoid();
189199
try {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import org.elasticsearch.logging.LogManager;
3434
import org.elasticsearch.logging.Logger;
3535
import org.elasticsearch.search.SearchShardTarget;
36+
import org.elasticsearch.threadpool.ThreadPool;
3637
import org.elasticsearch.transport.RemoteClusterAware;
3738
import org.elasticsearch.transport.RemoteClusterService;
39+
import org.elasticsearch.transport.TcpTransport;
3840
import org.elasticsearch.xpack.esql.VerificationException;
3941
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4042
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
@@ -166,6 +168,7 @@ public String sessionId() {
166168
* Execute an ESQL request.
167169
*/
168170
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
171+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
169172
assert executionInfo != null : "Null EsqlExecutionInfo";
170173
LOGGER.debug("ESQL query:\n{}", request.query());
171174
LogicalPlan parsed = parse(request.query(), request.params());
@@ -196,6 +199,12 @@ public void executeOptimizedPlan(
196199
LogicalPlan optimizedPlan,
197200
ActionListener<Result> listener
198201
) {
202+
assert ThreadPool.assertCurrentThreadPool(
203+
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
204+
ThreadPool.Names.SYSTEM_READ,
205+
ThreadPool.Names.SEARCH,
206+
ThreadPool.Names.SEARCH_COORDINATION
207+
);
199208
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
200209
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
201210
String physicalPlanString = physicalPlan.toString();

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
7777

7878
private TestThreadPool threadPool;
7979
private Executor executor = null;
80-
private static final String ESQL_TEST_EXECUTOR = "esql_test_executor";
8180

8281
private final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node-1").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
8382
private final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node-2").roles(Set.of(DATA_HOT_NODE_ROLE)).build();
@@ -95,9 +94,16 @@ public void setThreadPool() {
9594
int numThreads = randomBoolean() ? 1 : between(2, 16);
9695
threadPool = new TestThreadPool(
9796
"test",
98-
new FixedExecutorBuilder(Settings.EMPTY, ESQL_TEST_EXECUTOR, numThreads, 1024, "esql", EsExecutors.TaskTrackingConfig.DEFAULT)
97+
new FixedExecutorBuilder(
98+
Settings.EMPTY,
99+
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
100+
numThreads,
101+
1024,
102+
"esql",
103+
EsExecutors.TaskTrackingConfig.DEFAULT
104+
)
99105
);
100-
executor = threadPool.executor(ESQL_TEST_EXECUTOR);
106+
executor = threadPool.executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME);
101107
}
102108

103109
@After

0 commit comments

Comments
 (0)