Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132981.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132981
summary: For ES|QL execution after `preMapper`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.BlockFactory;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -161,6 +163,8 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public final class EsqlTestUtils {
Expand All @@ -172,6 +176,8 @@ public final class EsqlTestUtils {
public static final Literal FIVE = new Literal(Source.EMPTY, 5, DataType.INTEGER);
public static final Literal SIX = new Literal(Source.EMPTY, 6, DataType.INTEGER);

private EsqlTestUtils() {}

public static Equals equalsOf(Expression left, Expression right) {
return new Equals(EMPTY, left, right, null);
}
Expand Down Expand Up @@ -413,7 +419,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
createMockTransportService(),
mock(SearchService.class),
null,
mock(ClusterService.class),
Expand All @@ -423,7 +429,17 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
new InferenceService(mock(Client.class))
);

private EsqlTestUtils() {}
private static TransportService createMockTransportService() {
var service = mock(TransportService.class);
doReturn(createMockThreadPool()).when(service).getThreadPool();
return service;
}

private static ThreadPool createMockThreadPool() {
var threadPool = mock(ThreadPool.class);
doReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE).when(threadPool).executor(anyString());
return threadPool;
}

public static Configuration configuration(QueryPragmas pragmas, String query) {
return new Configuration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand All @@ -65,7 +64,6 @@
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -194,11 +192,8 @@ public void execute(
) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
ThreadPool.Names.SEARCH_COORDINATION
);
Tuple<List<PhysicalPlan>, PhysicalPlan> subplansAndMainPlan = PlannerUtils.breakPlanIntoSubPlansAndMainPlan(physicalPlan);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -131,14 +129,7 @@ abstract class DataNodeRequestSender {
}

final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(
EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME,
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME, ThreadPool.Names.SEARCH_COORDINATION);
final long startTimeInNanos = System.nanoTime();
searchShards(concreteIndices, ActionListener.wrap(targetShards -> {
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public TransportEsqlQueryAction(
this.threadPool = threadPool;
this.planExecutor = planExecutor;
this.clusterService = clusterService;
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field caps is executed on SEARCH_COORDINATION.
This sound like query metadata processing should go to SEARCH_COORDINATION and actual searching/aggregation to SEARCH/SYSTEM_READ to me.

exchangeService.registerTransportHandler(transportService);
this.exchangeService = exchangeService;
this.enrichPolicyResolver = new EnrichPolicyResolver(
Expand Down Expand Up @@ -196,7 +196,7 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<Esq
}

private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION);
if (requestIsAsync(request)) {
asyncTaskManagementService.asyncExecute(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockUtils;
Expand All @@ -34,7 +35,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
Expand Down Expand Up @@ -75,14 +75,14 @@
import org.elasticsearch.xpack.esql.planner.premapper.PreMapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -114,13 +114,14 @@ public interface PlanRunner {
private final LogicalPlanPreOptimizer logicalPlanPreOptimizer;
private final LogicalPlanOptimizer logicalPlanOptimizer;
private final PreMapper preMapper;

private final Mapper mapper;
private final PhysicalPlanOptimizer physicalPlanOptimizer;

private final PlanTelemetry planTelemetry;
private final IndicesExpressionGrouper indicesExpressionGrouper;
private final InferenceService inferenceService;
private final RemoteClusterService remoteClusterService;
private final ExecutorService planExecutor;

private boolean explainMode;
private String parsedPlanString;
Expand Down Expand Up @@ -157,6 +158,7 @@ public EsqlSession(
this.inferenceService = services.inferenceService();
this.preMapper = new PreMapper(services);
this.remoteClusterService = services.transportService().getRemoteClusterService();
this.planExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
}

public String sessionId() {
Expand All @@ -167,7 +169,7 @@ public String sessionId() {
* Execute an ESQL request.
*/
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION);
assert executionInfo != null : "Null EsqlExecutionInfo";
LOGGER.debug("ESQL query:\n{}", request.query());
LogicalPlan parsed = parse(request.query(), request.params());
Expand All @@ -179,8 +181,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION);
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.andThenApply(p -> optimizedPlan(p))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, new ThreadedActionListener<>(planExecutor, l)))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}
Expand All @@ -198,13 +202,7 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
assert ThreadPool.assertCurrentThreadPool(
TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
MachineLearning.NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME
);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION);
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
Expand Down Expand Up @@ -365,7 +363,7 @@ public void analyzedPlan(
QueryBuilder requestFilter,
ActionListener<LogicalPlan> logicalPlanListener
) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION);
if (parsed.analyzed()) {
logicalPlanListener.onResponse(parsed);
return;
Expand Down Expand Up @@ -395,14 +393,11 @@ private void preAnalyzeLookupIndex(
EsqlExecutionInfo executionInfo,
ActionListener<PreAnalysisResult> listener
) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION);

String localPattern = lookupIndexPattern.indexPattern();
assert RemoteClusterAware.isRemoteIndexName(localPattern) == false
: "Lookup index name should not include remote, but got: " + localPattern;
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
Set<String> fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames;

String patternWithRemotes;
Expand Down Expand Up @@ -603,11 +598,7 @@ private void preAnalyzeMainIndices(
QueryBuilder requestFilter,
ActionListener<PreAnalysisResult> listener
) {
assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.SEARCH,
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH_COORDINATION);
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
List<IndexPattern> indices = preAnalysis.indices;
if (indices.size() > 1) {
Expand Down Expand Up @@ -760,7 +751,7 @@ public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
}

public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
if (optimizedPlan.optimized() == false) {
throw new IllegalStateException("Expected optimized plan");
}
Expand Down