Skip to content

Conversation

@idegtiarenko
Copy link
Contributor

@idegtiarenko idegtiarenko commented Aug 15, 2025

This change forks execution back to the search thread pool after execution preMapper.
It narrows down a lot list of threads where remaining planning, optimization and request sending to data nodes can happen and eliminates a possibility of running on ml or netty/transport thread.

Related to: #131799

@idegtiarenko idegtiarenko requested review from dnhatn and nik9000 August 15, 2025 13:05
@idegtiarenko idegtiarenko added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.2.0 labels Aug 15, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @idegtiarenko, I've created a changelog YAML for you.

this.inferenceService = services.inferenceService();
this.preMapper = new PreMapper(services);
this.remoteClusterService = services.transportService().getRemoteClusterService();
this.planExecutor = services.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this technically planExecutorExecutor? 🤔

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a question, but this change was long overdue. Thank you!

SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.andThenApply(p -> optimizedPlan(p))
.<LogicalPlan>andThen(planExecutor, null, (l, p) -> preMapper.preMapper(p, l))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specify the thread pool in the next step instead because preMapper.preMapper() can switch the executing thread during rewrite.

Rewriteable.rewriteAndFetch(
new FullTextFunctionsRewritable(plan),
queryRewriteContext(services, indexNames(plan)),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
);

Copy link
Contributor Author

@idegtiarenko idegtiarenko Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, pre mapper could fork us to number of different threads.
andThen customizes the listener supplied to the given step to fork with executor after its completion.
This is also confirmed by the updated assertions (in executeOptimizedPlan right after this is completed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a place that allow to narrow down the list of thread everywhere else in this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.AssertionError: elasticsearch[test-cluster-0][transport_worker][T#8] not in [search, search_coordination] nor a test thread
	at org.elasticsearch.threadpool.ThreadPool.assertCurrentThreadPool(ThreadPool.java:1116) ~[elasticsearch-9.2.0-SNAPSHOT.jar:?]
	at org.elasticsearch.xpack.esql.session.EsqlSession.executeOptimizedPlan(EsqlSession.java:204) ~[?:?]

could still happen with

 ./gradlew ":x-pack:plugin:esql:qa:server:multi-node:javaRestTest" --tests "org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT" -Dtests.method="test {csv-spec:match-function.TestMatchWithSemanticTextWithEvalsAndOtherFunctionsAndStats}"

Looks like .<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, new ThreadedActionListener<>(planExecutor, l))) might be necessary instead of conditional forking logic in andThen

this.planExecutor = planExecutor;
this.clusterService = clusterService;
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field caps is executed on SEARCH_COORDINATION.
This sound like query metadata processing should go to SEARCH_COORDINATION and actual searching/aggregation to SEARCH/SYSTEM_READ to me.

@idegtiarenko idegtiarenko changed the title For ES|QL execution after preMapper Fork ES|QL execution after preMapper Aug 20, 2025
@idegtiarenko idegtiarenko marked this pull request as draft August 21, 2025 12:09
@idegtiarenko
Copy link
Contributor Author

Replaced by #133313

@idegtiarenko idegtiarenko deleted the fork_after_pre_mapper branch September 17, 2025 09:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants