Skip to content

Commit 243f55a

Browse files
committed
Revert "Refactor remote cluster handling in Analyzer (elastic#126426)"
This reverts commit b21e325.
1 parent 83ce15a commit 243f55a

File tree

4 files changed

+72
-45
lines changed

4 files changed

+72
-45
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public final class EnrichResolution {
2323

2424
private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
2525
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
26+
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
2627

2728
public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
2829
return resolvedPolicies.get(new Key(policyName, mode));
@@ -51,6 +52,14 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
5152
errors.putIfAbsent(new Key(policyName, mode), reason);
5253
}
5354

55+
public void addUnavailableCluster(String clusterAlias, Exception e) {
56+
unavailableClusters.put(clusterAlias, e);
57+
}
58+
59+
public Map<String, Exception> getUnavailableClusters() {
60+
return unavailableClusters;
61+
}
62+
5463
private record Key(String policyName, Enrich.Mode mode) {
5564

5665
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
13+
import org.elasticsearch.action.search.SearchRequest;
1314
import org.elasticsearch.action.support.ChannelActionListener;
1415
import org.elasticsearch.action.support.ContextPreservingActionListener;
1516
import org.elasticsearch.action.support.RefCountingListener;
@@ -36,7 +37,6 @@
3637
import org.elasticsearch.xpack.core.ClientHelper;
3738
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
3839
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
39-
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4040
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4141
import org.elasticsearch.xpack.esql.core.type.DataType;
4242
import org.elasticsearch.xpack.esql.core.type.EsField;
@@ -49,6 +49,7 @@
4949

5050
import java.io.IOException;
5151
import java.util.ArrayList;
52+
import java.util.Arrays;
5253
import java.util.Collection;
5354
import java.util.Collections;
5455
import java.util.HashMap;
@@ -58,8 +59,6 @@
5859
import java.util.Set;
5960
import java.util.stream.Collectors;
6061

61-
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;
62-
6362
/**
6463
* Resolves enrich policies across clusters in several steps:
6564
* 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
@@ -99,22 +98,21 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) {
9998
/**
10099
* Resolves a set of enrich policies
101100
*
101+
* @param targetClusters the target clusters
102102
* @param unresolvedPolicies the unresolved policies
103-
* @param executionInfo the execution info
104103
* @param listener notified with the enrich resolution
105104
*/
106105
public void resolvePolicies(
106+
Collection<String> targetClusters,
107107
Collection<UnresolvedPolicy> unresolvedPolicies,
108-
EsqlExecutionInfo executionInfo,
109108
ActionListener<EnrichResolution> listener
110109
) {
111-
if (unresolvedPolicies.isEmpty()) {
110+
if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
112111
listener.onResponse(new EnrichResolution());
113112
return;
114113
}
115-
116-
final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
117-
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
114+
final Set<String> remoteClusters = new HashSet<>(targetClusters);
115+
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
118116
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
119117
final EnrichResolution enrichResolution = new EnrichResolution();
120118

@@ -123,14 +121,7 @@ public void resolvePolicies(
123121
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
124122
String clusterAlias = entry.getKey();
125123
if (entry.getValue().connectionError != null) {
126-
assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
127-
: "Should never have a connection error for the local cluster";
128-
markClusterWithFinalStateAndNoShards(
129-
executionInfo,
130-
clusterAlias,
131-
EsqlExecutionInfo.Cluster.Status.SKIPPED,
132-
entry.getValue().connectionError
133-
);
124+
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
134125
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
135126
remoteClusters.remove(clusterAlias);
136127
} else {
@@ -456,4 +447,11 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
456447
listener
457448
);
458449
}
450+
451+
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
452+
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
453+
.entrySet()
454+
.stream()
455+
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
456+
}
459457
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.OriginalIndices;
12+
import org.elasticsearch.action.search.ShardSearchFailure;
1213
import org.elasticsearch.action.support.IndicesOptions;
1314
import org.elasticsearch.action.support.SubscribableListener;
1415
import org.elasticsearch.common.Strings;
@@ -18,6 +19,7 @@
1819
import org.elasticsearch.compute.data.Page;
1920
import org.elasticsearch.compute.operator.DriverProfile;
2021
import org.elasticsearch.core.Releasables;
22+
import org.elasticsearch.core.TimeValue;
2123
import org.elasticsearch.index.IndexMode;
2224
import org.elasticsearch.index.mapper.IndexModeFieldMapper;
2325
import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -84,6 +86,7 @@
8486
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
8587

8688
import java.util.ArrayList;
89+
import java.util.Arrays;
8790
import java.util.HashMap;
8891
import java.util.Iterator;
8992
import java.util.List;
@@ -363,10 +366,16 @@ public void analyzedPlan(
363366
final List<IndexPattern> indices = preAnalysis.indices;
364367

365368
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
366-
initializeClusterData(indices, executionInfo);
369+
370+
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
371+
configuredClusters,
372+
indices.stream()
373+
.flatMap(index -> Arrays.stream(Strings.commaDelimitedListToStringArray(index.indexPattern())))
374+
.toArray(String[]::new)
375+
).keySet();
367376

368377
var listener = SubscribableListener.<EnrichResolution>newForked(
369-
l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)
378+
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
370379
)
371380
.<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l))
372381
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l));
@@ -392,6 +401,12 @@ public void analyzedPlan(
392401
}).<PreAnalysisResult>andThen((l, result) -> {
393402
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";
394403

404+
// "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices
405+
// resolving one more time (the first attempt failed and the query had a filter)
406+
for (String clusterAlias : executionInfo.clusterAliases()) {
407+
executionInfo.swapCluster(clusterAlias, (k, v) -> null);
408+
}
409+
395410
// here the requestFilter is set to null, performing the pre-analysis after the first step failed
396411
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, l);
397412
}).<LogicalPlan>andThen((l, result) -> {
@@ -421,26 +436,6 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result,
421436
// TODO: Verify that the resolved index actually has indexMode: "lookup"
422437
}
423438

424-
private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {
425-
if (indices.isEmpty()) {
426-
return;
427-
}
428-
assert indices.size() == 1 : "Only single index pattern is supported";
429-
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
430-
configuredClusters,
431-
IndicesOptions.DEFAULT,
432-
indices.getFirst().indexPattern()
433-
);
434-
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
435-
final String clusterAlias = entry.getKey();
436-
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
437-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
438-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
439-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
440-
});
441-
}
442-
}
443-
444439
private void preAnalyzeMainIndices(
445440
PreAnalyzer.PreAnalysis preAnalysis,
446441
EsqlExecutionInfo executionInfo,
@@ -454,8 +449,38 @@ private void preAnalyzeMainIndices(
454449
// Note: JOINs are not supported but we detect them when
455450
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
456451
} else if (indices.size() == 1) {
452+
// known to be unavailable from the enrich policy API call
453+
Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
457454
IndexPattern table = indices.getFirst();
458455

456+
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
457+
configuredClusters,
458+
IndicesOptions.DEFAULT,
459+
table.indexPattern()
460+
);
461+
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
462+
final String clusterAlias = entry.getKey();
463+
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
464+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
465+
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
466+
if (unavailableClusters.containsKey(k)) {
467+
return new EsqlExecutionInfo.Cluster(
468+
clusterAlias,
469+
indexExpr,
470+
executionInfo.isSkipUnavailable(clusterAlias),
471+
EsqlExecutionInfo.Cluster.Status.SKIPPED,
472+
0,
473+
0,
474+
0,
475+
0,
476+
List.of(new ShardSearchFailure(unavailableClusters.get(k))),
477+
new TimeValue(0)
478+
);
479+
} else {
480+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
481+
}
482+
});
483+
}
459484
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
460485
// based only on available clusters (which could now be an empty list)
461486
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.transport.TransportService;
4040
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
4141
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
42-
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4342
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4443
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
4544
import org.elasticsearch.xpack.esql.session.IndexResolver;
@@ -430,10 +429,6 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
430429

431430
EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
432431
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
433-
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
434-
for (String cluster : clusters) {
435-
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
436-
}
437432
if (randomBoolean()) {
438433
unresolvedPolicies = new ArrayList<>(unresolvedPolicies);
439434
for (Enrich.Mode mode : Enrich.Mode.values()) {
@@ -447,7 +442,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
447442
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
448443
}
449444
}
450-
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
445+
super.resolvePolicies(clusters, unresolvedPolicies, future);
451446
return future.actionGet(30, TimeUnit.SECONDS);
452447
}
453448

0 commit comments

Comments
 (0)