Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132981.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132981
summary: For ES|QL execution after `preMapper`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand All @@ -65,7 +64,6 @@
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -192,14 +190,7 @@ public void execute(
EsqlExecutionInfo execInfo,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

List<PhysicalPlan> subplans = subplansAndMainPlan.v1();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -131,14 +129,7 @@ abstract class DataNodeRequestSender {
}

final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
Expand Down Expand Up @@ -75,14 +74,14 @@
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -114,13 +113,14 @@ public interface PlanRunner {
private final LogicalPlanPreOptimizer logicalPlanPreOptimizer;
private final LogicalPlanOptimizer logicalPlanOptimizer;
private final PreMapper preMapper;

private final Mapper mapper;
private final PhysicalPlanOptimizer physicalPlanOptimizer;

private final PlanTelemetry planTelemetry;
private final IndicesExpressionGrouper indicesExpressionGrouper;
private final InferenceService inferenceService;
private final RemoteClusterService remoteClusterService;
private final ExecutorService planExecutor;

private boolean explainMode;
private String parsedPlanString;
Expand Down Expand Up @@ -157,6 +157,7 @@ public EsqlSession(
this.inferenceService = services.inferenceService();
this.preMapper = new PreMapper(services);
this.remoteClusterService = services.transportService().getRemoteClusterService();
this.planExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this technically planExecutorExecutor? 🤔

}

public String sessionId() {
Expand All @@ -179,8 +180,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.andThenApply(p -> optimizedPlan(p))
.<LogicalPlan>andThen(planExecutor, null, (l, p) -> preMapper.preMapper(p, l))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specify the thread pool in the next step instead because preMapper.preMapper() can switch the executing thread during rewrite.

Rewriteable.rewriteAndFetch(
new FullTextFunctionsRewritable(plan),
queryRewriteContext(services, indexNames(plan)),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
);

Copy link
Contributor Author

@idegtiarenko idegtiarenko Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, pre mapper could fork us to number of different threads.
andThen customizes the listener supplied to the given step to fork with executor after its completion.
This is also confirmed by the updated assertions (in executeOptimizedPlan right after this is completed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a place that allow to narrow down the list of thread everywhere else in this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.AssertionError: elasticsearch[test-cluster-0][transport_worker][T#8] not in [search, search_coordination] nor a test thread
	at org.elasticsearch.threadpool.ThreadPool.assertCurrentThreadPool(ThreadPool.java:1116) ~[elasticsearch-9.2.0-SNAPSHOT.jar:?]
	at org.elasticsearch.xpack.esql.session.EsqlSession.executeOptimizedPlan(EsqlSession.java:204) ~[?:?]

could still happen with

 ./gradlew ":x-pack:plugin:esql:qa:server:multi-node:javaRestTest" --tests "org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT" -Dtests.method="test {csv-spec:match-function.TestMatchWithSemanticTextWithEvalsAndOtherFunctionsAndStats}"

Looks like .<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, new ThreadedActionListener<>(planExecutor, l))) might be necessary instead of conditional forking logic in andThen

.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}
Expand All @@ -198,13 +201,7 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
Expand Down Expand Up @@ -760,7 +757,7 @@ public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
}

public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
if (optimizedPlan.optimized() == false) {
throw new IllegalStateException("Expected optimized plan");
}
Expand Down