Skip to content

Commit b4d6b8d

Browse files
committed
Concurrent lookup index resolution
This change attempts to resolve lookup indices concurrently.
1 parent 1bb9d2b commit b4d6b8d

File tree

1 file changed

+17
-13
lines changed
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session

1 file changed

+17
-13
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1515
import org.elasticsearch.action.search.ShardSearchFailure;
16+
import org.elasticsearch.action.support.RefCountingListener;
1617
import org.elasticsearch.action.support.SubscribableListener;
1718
import org.elasticsearch.common.collect.Iterators;
1819
import org.elasticsearch.common.lucene.BytesRefs;
@@ -85,6 +86,7 @@
8586
import java.util.List;
8687
import java.util.Map;
8788
import java.util.Set;
89+
import java.util.concurrent.ConcurrentHashMap;
8890
import java.util.stream.Collectors;
8991
import java.util.stream.Stream;
9092

@@ -402,11 +404,15 @@ public void analyzedPlan(
402404
preAnalysis.inferencePlans,
403405
l.map(preAnalysisResult::withInferenceResolution)
404406
)
405-
);
406-
// first resolve the lookup indices, then the main indices
407-
for (var index : preAnalysis.lookupIndices) {
408-
listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l));
409-
}
407+
)
408+
// first resolve the lookup indices, then the main indices
409+
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> {
410+
try (var refs = new RefCountingListener(l.map(ignored -> preAnalysisResult))) {
411+
for (var index : preAnalysis.lookupIndices) {
412+
preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, refs.acquire());
413+
}
414+
}
415+
});
410416
listener.<PreAnalysisResult>andThen((l, result) -> {
411417
// resolve the main indices
412418
preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l);
@@ -448,7 +454,7 @@ private void preAnalyzeLookupIndex(
448454
IndexPattern lookupIndexPattern,
449455
PreAnalysisResult result,
450456
EsqlExecutionInfo executionInfo,
451-
ActionListener<PreAnalysisResult> listener
457+
ActionListener<Void> listener
452458
) {
453459
String localPattern = lookupIndexPattern.indexPattern();
454460
assert RemoteClusterAware.isRemoteIndexName(localPattern) == false
@@ -469,12 +475,10 @@ private void preAnalyzeLookupIndex(
469475
return;
470476
}
471477
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
472-
indexResolver.resolveAsMergedMapping(
473-
patternWithRemotes,
474-
fieldNames,
475-
null,
476-
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
477-
);
478+
indexResolver.resolveAsMergedMapping(patternWithRemotes, fieldNames, null, listener.map(indexResolution -> {
479+
receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution);
480+
return null;
481+
}));
478482
}
479483

480484
private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) {
@@ -812,7 +816,7 @@ public record PreAnalysisResult(
812816
) {
813817

814818
public PreAnalysisResult(EnrichResolution enrichResolution, Set<String> fieldNames, Set<String> wildcardJoinIndices) {
815-
this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY);
819+
this(null, new ConcurrentHashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY);
816820
}
817821

818822
PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) {

0 commit comments

Comments
 (0)