Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132981.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132981
summary: For ES|QL execution after `preMapper`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand All @@ -65,7 +64,6 @@
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -192,14 +190,7 @@ public void execute(
EsqlExecutionInfo execInfo,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

List<PhysicalPlan> subplans = subplansAndMainPlan.v1();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -131,14 +129,7 @@ abstract class DataNodeRequestSender {
}

final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockUtils;
Expand All @@ -34,7 +35,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
Expand Down Expand Up @@ -75,14 +75,14 @@
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -114,13 +114,14 @@ public interface PlanRunner {
private final LogicalPlanPreOptimizer logicalPlanPreOptimizer;
private final LogicalPlanOptimizer logicalPlanOptimizer;
private final PreMapper preMapper;

private final Mapper mapper;
private final PhysicalPlanOptimizer physicalPlanOptimizer;

private final PlanTelemetry planTelemetry;
private final IndicesExpressionGrouper indicesExpressionGrouper;
private final InferenceService inferenceService;
private final RemoteClusterService remoteClusterService;
private final ExecutorService planExecutor;

private boolean explainMode;
private String parsedPlanString;
Expand Down Expand Up @@ -157,6 +158,7 @@ public EsqlSession(
this.inferenceService = services.inferenceService();
this.preMapper = new PreMapper(services);
this.remoteClusterService = services.transportService().getRemoteClusterService();
this.planExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this technically planExecutorExecutor? 🤔

}

public String sessionId() {
Expand All @@ -180,7 +182,8 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
@Override
public void onResponse(LogicalPlan analyzedPlan) {
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.andThenApply(p -> optimizedPlan(p))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, new ThreadedActionListener<>(planExecutor, l)))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}
Expand All @@ -198,13 +201,7 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
Expand Down Expand Up @@ -760,7 +757,7 @@ public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
}

public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
if (optimizedPlan.optimized() == false) {
throw new IllegalStateException("Expected optimized plan");
}
Expand Down