From b4d6b8d8b3ac335aa5e7816411f570292d758831 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 31 Jul 2025 09:55:01 +0200 Subject: [PATCH] Concurrent lookup index resolution This change attempts to resolve lookup indices concurrently. --- .../xpack/esql/session/EsqlSession.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 63b7173f54432..ed94880dfc85b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.lucene.BytesRefs; @@ -85,6 +86,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -402,11 +404,15 @@ public void analyzedPlan( preAnalysis.inferencePlans, l.map(preAnalysisResult::withInferenceResolution) ) - ); - // first resolve the lookup indices, then the main indices - for (var index : preAnalysis.lookupIndices) { - listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l)); - } + ) + // first resolve the lookup indices, then the main indices + .andThen((l, preAnalysisResult) -> { + try (var refs = new RefCountingListener(l.map(ignored -> preAnalysisResult))) { + for (var index : preAnalysis.lookupIndices) { + preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, refs.acquire()); + } + } + }); listener.andThen((l, result) -> { // resolve the main indices preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l); @@ -448,7 +454,7 @@ private void preAnalyzeLookupIndex( IndexPattern lookupIndexPattern, PreAnalysisResult result, EsqlExecutionInfo executionInfo, - ActionListener listener + ActionListener listener ) { String localPattern = lookupIndexPattern.indexPattern(); assert RemoteClusterAware.isRemoteIndexName(localPattern) == false @@ -469,12 +475,10 @@ private void preAnalyzeLookupIndex( return; } // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping( - patternWithRemotes, - fieldNames, - null, - listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) - ); + indexResolver.resolveAsMergedMapping(patternWithRemotes, fieldNames, null, listener.map(indexResolution -> { + receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution); + return null; + })); } private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) { @@ -812,7 +816,7 @@ public record PreAnalysisResult( ) { public PreAnalysisResult(EnrichResolution enrichResolution, Set fieldNames, Set wildcardJoinIndices) { - this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY); + this(null, new ConcurrentHashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY); } PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) {