Skip to content

Commit 6338f0c

Browse files
committed
For ES|QL execution after preMapper
1 parent 16fe7db commit 6338f0c

File tree

3 files changed

+11
-32
lines changed

3 files changed

+11
-32
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.threadpool.ThreadPool;
4545
import org.elasticsearch.transport.AbstractTransportRequest;
4646
import org.elasticsearch.transport.RemoteClusterAware;
47-
import org.elasticsearch.transport.TcpTransport;
4847
import org.elasticsearch.transport.TransportException;
4948
import org.elasticsearch.transport.TransportService;
5049
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
@@ -65,7 +64,6 @@
6564
import org.elasticsearch.xpack.esql.session.Configuration;
6665
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
6766
import org.elasticsearch.xpack.esql.session.Result;
68-
import org.elasticsearch.xpack.ml.MachineLearning;
6967

7068
import java.util.ArrayList;
7169
import java.util.Collections;
@@ -192,14 +190,7 @@ public void execute(
192190
EsqlExecutionInfo execInfo,
193191
ActionListener<Result> listener
194192
) {
195-
assert ThreadPool.assertCurrentThreadPool(
196-
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
197-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
198-
ThreadPool.Names.SYSTEM_READ,
199-
ThreadPool.Names.SEARCH,
200-
ThreadPool.Names.SEARCH_COORDINATION,
201-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
202-
);
193+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
203194
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);
204195

205196
List<PhysicalPlan> subplans = subplansAndMainPlan.v1();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import org.elasticsearch.tasks.CancellableTask;
3535
import org.elasticsearch.tasks.TaskCancelledException;
3636
import org.elasticsearch.threadpool.ThreadPool;
37-
import org.elasticsearch.transport.TcpTransport;
3837
import org.elasticsearch.transport.TransportException;
3938
import org.elasticsearch.transport.TransportRequestOptions;
4039
import org.elasticsearch.transport.TransportService;
4140
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
42-
import org.elasticsearch.xpack.ml.MachineLearning;
4341

4442
import java.util.ArrayList;
4543
import java.util.Collections;
@@ -131,14 +129,7 @@ abstract class DataNodeRequestSender {
131129
}
132130

133131
final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
134-
assert ThreadPool.assertCurrentThreadPool(
135-
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
136-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
137-
ThreadPool.Names.SYSTEM_READ,
138-
ThreadPool.Names.SEARCH,
139-
ThreadPool.Names.SEARCH_COORDINATION,
140-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
141-
);
132+
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH);
142133
final long startTimeInNanos = System.nanoTime();
143134
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
144135
try (

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1515
import org.elasticsearch.action.search.ShardSearchFailure;
1616
import org.elasticsearch.action.support.SubscribableListener;
17+
import org.elasticsearch.action.support.ThreadedActionListener;
1718
import org.elasticsearch.common.collect.Iterators;
1819
import org.elasticsearch.compute.data.Block;
1920
import org.elasticsearch.compute.data.BlockUtils;
@@ -34,7 +35,6 @@
3435
import org.elasticsearch.threadpool.ThreadPool;
3536
import org.elasticsearch.transport.RemoteClusterAware;
3637
import org.elasticsearch.transport.RemoteClusterService;
37-
import org.elasticsearch.transport.TcpTransport;
3838
import org.elasticsearch.xpack.esql.VerificationException;
3939
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4040
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
@@ -75,14 +75,14 @@
7575
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
7676
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
7777
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
78-
import org.elasticsearch.xpack.ml.MachineLearning;
7978

8079
import java.util.ArrayList;
8180
import java.util.Collection;
8281
import java.util.HashMap;
8382
import java.util.List;
8483
import java.util.Map;
8584
import java.util.Set;
85+
import java.util.concurrent.ExecutorService;
8686
import java.util.stream.Collectors;
8787
import java.util.stream.Stream;
8888

@@ -114,13 +114,14 @@ public interface PlanRunner {
114114
private final LogicalPlanPreOptimizer logicalPlanPreOptimizer;
115115
private final LogicalPlanOptimizer logicalPlanOptimizer;
116116
private final PreMapper preMapper;
117-
118117
private final Mapper mapper;
119118
private final PhysicalPlanOptimizer physicalPlanOptimizer;
119+
120120
private final PlanTelemetry planTelemetry;
121121
private final IndicesExpressionGrouper indicesExpressionGrouper;
122122
private final InferenceService inferenceService;
123123
private final RemoteClusterService remoteClusterService;
124+
private final ExecutorService optimizerExecutor;
124125

125126
private boolean explainMode;
126127
private String parsedPlanString;
@@ -157,6 +158,7 @@ public EsqlSession(
157158
this.inferenceService = services.inferenceService();
158159
this.preMapper = new PreMapper(services);
159160
this.remoteClusterService = services.transportService().getRemoteClusterService();
161+
this.optimizerExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
160162
}
161163

162164
public String sessionId() {
@@ -180,7 +182,8 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
180182
@Override
181183
public void onResponse(LogicalPlan analyzedPlan) {
182184
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
183-
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
185+
.andThenApply(p -> optimizedPlan(p))
186+
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, new ThreadedActionListener<>(optimizerExecutor, l)))
184187
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
185188
.addListener(listener);
186189
}
@@ -198,13 +201,7 @@ public void executeOptimizedPlan(
198201
LogicalPlan optimizedPlan,
199202
ActionListener<Result> listener
200203
) {
201-
assert ThreadPool.assertCurrentThreadPool(
202-
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
203-
ThreadPool.Names.SYSTEM_READ,
204-
ThreadPool.Names.SEARCH,
205-
ThreadPool.Names.SEARCH_COORDINATION,
206-
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
207-
);
204+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
208205
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
209206
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
210207
String physicalPlanString = physicalPlan.toString();
@@ -760,7 +757,7 @@ public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan
760757
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
761758
}
762759

763-
public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
760+
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
764761
if (optimizedPlan.optimized() == false) {
765762
throw new IllegalStateException("Expected optimized plan");
766763
}

0 commit comments

Comments
 (0)