diff --git a/docs/changelog/132981.yaml b/docs/changelog/132981.yaml new file mode 100644 index 0000000000000..ee02576731131 --- /dev/null +++ b/docs/changelog/132981.yaml @@ -0,0 +1,5 @@ +pr: 132981 +summary: For ES|QL execution after `preMapper` +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 7f9f77509310e..f0bc458ead468 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.BlockFactory; @@ -43,6 +44,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentType; @@ -161,6 +163,8 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; public final class EsqlTestUtils { @@ -172,6 +176,8 @@ public final class EsqlTestUtils { public static final Literal FIVE = new Literal(Source.EMPTY, 5, DataType.INTEGER); public static final Literal SIX = new Literal(Source.EMPTY, 6, DataType.INTEGER); + private EsqlTestUtils() {} + public static Equals equalsOf(Expression left, Expression right) { return new Equals(EMPTY, left, right, null); } @@ -413,7 +419,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L)); public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices( - mock(TransportService.class), + createMockTransportService(), mock(SearchService.class), null, mock(ClusterService.class), @@ -423,7 +429,17 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { new InferenceService(mock(Client.class)) ); - private EsqlTestUtils() {} + private static TransportService createMockTransportService() { + var service = mock(TransportService.class); + doReturn(createMockThreadPool()).when(service).getThreadPool(); + return service; + } + + private static ThreadPool createMockThreadPool() { + var threadPool = mock(ThreadPool.class); + doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString()); + return threadPool; + } public static Configuration configuration(QueryPragmas pragmas, String query) { return new Configuration( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index d72fe46169809..aa3c97846ef71 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -44,7 +44,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -65,7 +64,6 @@ import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import org.elasticsearch.xpack.esql.session.Result; -import org.elasticsearch.xpack.ml.MachineLearning; import java.util.ArrayList; import java.util.Collections; @@ -194,11 +192,8 @@ public void execute( ) { assert ThreadPool.assertCurrentThreadPool( EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, - TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, - ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SEARCH, - ThreadPool.Names.SEARCH_COORDINATION, - MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME + ThreadPool.Names.SEARCH_COORDINATION ); Tuple, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 388594f21a215..4f0f24e321654 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -34,12 +34,10 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction; -import org.elasticsearch.xpack.ml.MachineLearning; import java.util.ArrayList; import java.util.Collections; @@ -131,14 +129,7 @@ abstract class DataNodeRequestSender { } final void startComputeOnDataNodes(Set concreteIndices, Runnable runOnTaskFailure, ActionListener listener) { - assert ThreadPool.assertCurrentThreadPool( - EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, - TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, - ThreadPool.Names.SYSTEM_READ, - ThreadPool.Names.SEARCH, - ThreadPool.Names.SEARCH_COORDINATION, - MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME - ); + assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH_COORDINATION); final long startTimeInNanos = System.nanoTime(); searchShards(concreteIndices, ActionListener.wrap(targetShards -> { try ( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index a23154c218a61..52ab46e53ec14 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -110,7 +110,7 @@ public TransportEsqlQueryAction( this.threadPool = threadPool; this.planExecutor = planExecutor; this.clusterService = clusterService; - this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH); + this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION); exchangeService.registerTransportHandler(transportService); this.exchangeService = exchangeService; this.enrichPolicyResolver = new EnrichPolicyResolver( @@ -196,7 +196,7 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener listener) { - assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); if (requestIsAsync(request)) { asyncTaskManagementService.asyncExecute( request, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index af9b8e0e4dac1..8c4b71ea79af3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockUtils; @@ -34,7 +35,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; -import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; @@ -75,7 +75,6 @@ import org.elasticsearch.xpack.esql.planner.premapper.PreMapper; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; -import org.elasticsearch.xpack.ml.MachineLearning; import java.util.ArrayList; import java.util.Collection; @@ -83,6 +82,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -114,13 +114,14 @@ public interface PlanRunner { private final LogicalPlanPreOptimizer logicalPlanPreOptimizer; private final LogicalPlanOptimizer logicalPlanOptimizer; private final PreMapper preMapper; - private final Mapper mapper; private final PhysicalPlanOptimizer physicalPlanOptimizer; + private final PlanTelemetry planTelemetry; private final IndicesExpressionGrouper indicesExpressionGrouper; private final InferenceService inferenceService; private final RemoteClusterService remoteClusterService; + private final ExecutorService planExecutor; private boolean explainMode; private String parsedPlanString; @@ -157,6 +158,7 @@ public EsqlSession( this.inferenceService = services.inferenceService(); this.preMapper = new PreMapper(services); this.remoteClusterService = services.transportService().getRemoteClusterService(); + this.planExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION); } public String sessionId() { @@ -167,7 +169,7 @@ public String sessionId() { * Execute an ESQL request. */ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener listener) { - assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); assert executionInfo != null : "Null EsqlExecutionInfo"; LOGGER.debug("ESQL query:\n{}", request.query()); LogicalPlan parsed = parse(request.query(), request.params()); @@ -179,8 +181,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION); SubscribableListener.newForked(l -> preOptimizedPlan(analyzedPlan, l)) - .andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l)) + .andThenApply(p -> optimizedPlan(p)) + .andThen((l, p) -> preMapper.preMapper(p, new ThreadedActionListener<>(planExecutor, l))) .andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) .addListener(listener); } @@ -198,13 +202,7 @@ public void executeOptimizedPlan( LogicalPlan optimizedPlan, ActionListener listener ) { - assert ThreadPool.assertCurrentThreadPool( - TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX, - ThreadPool.Names.SYSTEM_READ, - ThreadPool.Names.SEARCH, - ThreadPool.Names.SEARCH_COORDINATION, - MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME - ); + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION); if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); String physicalPlanString = physicalPlan.toString(); @@ -365,7 +363,7 @@ public void analyzedPlan( QueryBuilder requestFilter, ActionListener logicalPlanListener ) { - assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); if (parsed.analyzed()) { logicalPlanListener.onResponse(parsed); return; @@ -395,14 +393,11 @@ private void preAnalyzeLookupIndex( EsqlExecutionInfo executionInfo, ActionListener listener ) { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION); + String localPattern = lookupIndexPattern.indexPattern(); assert RemoteClusterAware.isRemoteIndexName(localPattern) == false : "Lookup index name should not include remote, but got: " + localPattern; - assert ThreadPool.assertCurrentThreadPool( - ThreadPool.Names.SEARCH, - ThreadPool.Names.SEARCH_COORDINATION, - ThreadPool.Names.SYSTEM_READ - ); Set fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames; String patternWithRemotes; @@ -603,11 +598,7 @@ private void preAnalyzeMainIndices( QueryBuilder requestFilter, ActionListener listener ) { - assert ThreadPool.assertCurrentThreadPool( - ThreadPool.Names.SEARCH, - ThreadPool.Names.SEARCH_COORDINATION, - ThreadPool.Names.SYSTEM_READ - ); + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one List indices = preAnalysis.indices; if (indices.size() > 1) { @@ -760,7 +751,7 @@ public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener