Skip to content

Commit b21e325

Browse files
authored
Refactor remote cluster handling in Analyzer (#126426)
* Refactor remote cluster handling in Analyzer - Initialize clusters earlier - Simplify cluster set calculation - No need to keep separate skipped list for enrich resolution
1 parent ecf9adf commit b21e325

File tree

4 files changed

+45
-72
lines changed

4 files changed

+45
-72
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public final class EnrichResolution {
2323

2424
private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
2525
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
26-
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
2726

2827
public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
2928
return resolvedPolicies.get(new Key(policyName, mode));
@@ -52,14 +51,6 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
5251
errors.putIfAbsent(new Key(policyName, mode), reason);
5352
}
5453

55-
public void addUnavailableCluster(String clusterAlias, Exception e) {
56-
unavailableClusters.put(clusterAlias, e);
57-
}
58-
59-
public Map<String, Exception> getUnavailableClusters() {
60-
return unavailableClusters;
61-
}
62-
6354
private record Key(String policyName, Enrich.Mode mode) {
6455

6556
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
13-
import org.elasticsearch.action.search.SearchRequest;
1413
import org.elasticsearch.action.support.ChannelActionListener;
1514
import org.elasticsearch.action.support.ContextPreservingActionListener;
1615
import org.elasticsearch.action.support.RefCountingListener;
@@ -37,6 +36,7 @@
3736
import org.elasticsearch.xpack.core.ClientHelper;
3837
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
3938
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
39+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4040
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4141
import org.elasticsearch.xpack.esql.core.type.DataType;
4242
import org.elasticsearch.xpack.esql.core.type.EsField;
@@ -49,7 +49,6 @@
4949

5050
import java.io.IOException;
5151
import java.util.ArrayList;
52-
import java.util.Arrays;
5352
import java.util.Collection;
5453
import java.util.Collections;
5554
import java.util.HashMap;
@@ -59,6 +58,8 @@
5958
import java.util.Set;
6059
import java.util.stream.Collectors;
6160

61+
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;
62+
6263
/**
6364
* Resolves enrich policies across clusters in several steps:
6465
* 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
@@ -98,21 +99,22 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) {
9899
/**
99100
* Resolves a set of enrich policies
100101
*
101-
* @param targetClusters the target clusters
102102
* @param unresolvedPolicies the unresolved policies
103+
* @param executionInfo the execution info
103104
* @param listener notified with the enrich resolution
104105
*/
105106
public void resolvePolicies(
106-
Collection<String> targetClusters,
107107
Collection<UnresolvedPolicy> unresolvedPolicies,
108+
EsqlExecutionInfo executionInfo,
108109
ActionListener<EnrichResolution> listener
109110
) {
110-
if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
111+
if (unresolvedPolicies.isEmpty()) {
111112
listener.onResponse(new EnrichResolution());
112113
return;
113114
}
114-
final Set<String> remoteClusters = new HashSet<>(targetClusters);
115-
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
115+
116+
final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
117+
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
116118
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
117119
final EnrichResolution enrichResolution = new EnrichResolution();
118120

@@ -121,7 +123,14 @@ public void resolvePolicies(
121123
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
122124
String clusterAlias = entry.getKey();
123125
if (entry.getValue().connectionError != null) {
124-
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
126+
assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
127+
: "Should never have a connection error for the local cluster";
128+
markClusterWithFinalStateAndNoShards(
129+
executionInfo,
130+
clusterAlias,
131+
EsqlExecutionInfo.Cluster.Status.SKIPPED,
132+
entry.getValue().connectionError
133+
);
125134
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
126135
remoteClusters.remove(clusterAlias);
127136
} else {
@@ -445,11 +454,4 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
445454
listener
446455
);
447456
}
448-
449-
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
450-
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
451-
.entrySet()
452-
.stream()
453-
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
454-
}
455457
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 22 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.OriginalIndices;
12-
import org.elasticsearch.action.search.ShardSearchFailure;
1312
import org.elasticsearch.action.support.IndicesOptions;
1413
import org.elasticsearch.action.support.SubscribableListener;
1514
import org.elasticsearch.common.Strings;
@@ -19,7 +18,6 @@
1918
import org.elasticsearch.compute.data.Page;
2019
import org.elasticsearch.compute.operator.DriverProfile;
2120
import org.elasticsearch.core.Releasables;
22-
import org.elasticsearch.core.TimeValue;
2321
import org.elasticsearch.index.IndexMode;
2422
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
2523
import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -85,7 +83,6 @@
8583
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
8684

8785
import java.util.ArrayList;
88-
import java.util.Arrays;
8986
import java.util.HashMap;
9087
import java.util.Iterator;
9188
import java.util.List;
@@ -365,16 +362,10 @@ public void analyzedPlan(
365362
final List<IndexPattern> indices = preAnalysis.indices;
366363

367364
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
368-
369-
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
370-
configuredClusters,
371-
indices.stream()
372-
.flatMap(index -> Arrays.stream(Strings.commaDelimitedListToStringArray(index.indexPattern())))
373-
.toArray(String[]::new)
374-
).keySet();
365+
initializeClusterData(indices, executionInfo);
375366

376367
var listener = SubscribableListener.<EnrichResolution>newForked(
377-
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
368+
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
378369
)
379370
.<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l))
380371
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l));
@@ -400,12 +391,6 @@ public void analyzedPlan(
400391
}).<PreAnalysisResult>andThen((l, result) -> {
401392
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
402393

403-
// "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
404-
// resolving one more time (the first attempt failed and the query had a filter)
405-
for (String clusterAlias : executionInfo.clusterAliases()) {
406-
executionInfo.swapCluster(clusterAlias, (k, v) -> null);
407-
}
408-
409394
// here the requestFilter is set to null, performing the pre-analysis after the first step failed
410395
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
411396
}).<LogicalPlan>andThen((l, result) -> {
@@ -435,6 +420,26 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result,
435420
// TODO: Verify that the resolved index actually has indexMode: "lookup"
436421
}
437422

423+
private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {
424+
if (indices.isEmpty()) {
425+
return;
426+
}
427+
assert indices.size() == 1 : "Only single index pattern is supported";
428+
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
429+
configuredClusters,
430+
IndicesOptions.DEFAULT,
431+
indices.getFirst().indexPattern()
432+
);
433+
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
434+
final String clusterAlias = entry.getKey();
435+
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
436+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
437+
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
438+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
439+
});
440+
}
441+
}
442+
438443
private void preAnalyzeMainIndices(
439444
PreAnalyzer.PreAnalysis preAnalysis,
440445
EsqlExecutionInfo executionInfo,
@@ -448,38 +453,8 @@ private void preAnalyzeMainIndices(
448453
// Note: JOINs are not supported but we detect them when
449454
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
450455
} else if (indices.size() == 1) {
451-
// known to be unavailable from the enrich policy API call
452-
Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
453456
IndexPattern table = indices.getFirst();
454457

455-
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
456-
configuredClusters,
457-
IndicesOptions.DEFAULT,
458-
table.indexPattern()
459-
);
460-
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
461-
final String clusterAlias = entry.getKey();
462-
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
463-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
464-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
465-
if (unavailableClusters.containsKey(k)) {
466-
return new EsqlExecutionInfo.Cluster(
467-
clusterAlias,
468-
indexExpr,
469-
executionInfo.isSkipUnavailable(clusterAlias),
470-
EsqlExecutionInfo.Cluster.Status.SKIPPED,
471-
0,
472-
0,
473-
0,
474-
0,
475-
List.of(new ShardSearchFailure(unavailableClusters.get(k))),
476-
new TimeValue(0)
477-
);
478-
} else {
479-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
480-
}
481-
});
482-
}
483458
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
484459
// based only on available clusters (which could now be an empty list)
485460
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.transport.TransportService;
4040
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
4141
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
42+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4243
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4344
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
4445
import org.elasticsearch.xpack.esql.session.IndexResolver;
@@ -429,6 +430,10 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
429430

430431
EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
431432
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
433+
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
434+
for (String cluster : clusters) {
435+
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
436+
}
432437
if (randomBoolean()) {
433438
unresolvedPolicies = new ArrayList<>(unresolvedPolicies);
434439
for (Enrich.Mode mode : Enrich.Mode.values()) {
@@ -442,7 +447,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
442447
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
443448
}
444449
}
445-
super.resolvePolicies(clusters, unresolvedPolicies, future);
450+
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
446451
return future.actionGet(30, TimeUnit.SECONDS);
447452
}
448453

0 commit comments

Comments
 (0)