Skip to content

Commit d4dbff9

Browse files
committed
Refactor remote cluster handling in Analyzer
- Initialize clusters earlier - Simplify cluster set calculation - No need to keep separate skipped list for enrich resolution
1 parent 20eb590 commit d4dbff9

File tree

5 files changed

+39
-60
lines changed

5 files changed

+39
-60
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public final class EnrichResolution {
2323

2424
private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
2525
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
26-
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
2726

2827
public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
2928
return resolvedPolicies.get(new Key(policyName, mode));
@@ -52,14 +51,6 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
5251
errors.putIfAbsent(new Key(policyName, mode), reason);
5352
}
5453

55-
public void addUnavailableCluster(String clusterAlias, Exception e) {
56-
unavailableClusters.put(clusterAlias, e);
57-
}
58-
59-
public Map<String, Exception> getUnavailableClusters() {
60-
return unavailableClusters;
61-
}
62-
6354
private record Key(String policyName, Enrich.Mode mode) {
6455

6556
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
13-
import org.elasticsearch.action.search.SearchRequest;
1413
import org.elasticsearch.action.support.ChannelActionListener;
1514
import org.elasticsearch.action.support.ContextPreservingActionListener;
1615
import org.elasticsearch.action.support.RefCountingListener;
@@ -37,6 +36,7 @@
3736
import org.elasticsearch.xpack.core.ClientHelper;
3837
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
3938
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
39+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4040
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4141
import org.elasticsearch.xpack.esql.core.type.DataType;
4242
import org.elasticsearch.xpack.esql.core.type.EsField;
@@ -49,7 +49,6 @@
4949

5050
import java.io.IOException;
5151
import java.util.ArrayList;
52-
import java.util.Arrays;
5352
import java.util.Collection;
5453
import java.util.Collections;
5554
import java.util.HashMap;
@@ -59,6 +58,8 @@
5958
import java.util.Set;
6059
import java.util.stream.Collectors;
6160

61+
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;
62+
6263
/**
6364
* Resolves enrich policies across clusters in several steps:
6465
* 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}.
@@ -105,6 +106,7 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) {
105106
public void resolvePolicies(
106107
Collection<String> targetClusters,
107108
Collection<UnresolvedPolicy> unresolvedPolicies,
109+
EsqlExecutionInfo executionInfo,
108110
ActionListener<EnrichResolution> listener
109111
) {
110112
if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
@@ -121,7 +123,14 @@ public void resolvePolicies(
121123
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
122124
String clusterAlias = entry.getKey();
123125
if (entry.getValue().connectionError != null) {
124-
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
126+
assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
127+
: "Should never have a connection error for the local cluster";
128+
markClusterWithFinalStateAndNoShards(
129+
executionInfo,
130+
clusterAlias,
131+
EsqlExecutionInfo.Cluster.Status.SKIPPED,
132+
entry.getValue().connectionError
133+
);
125134
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
126135
remoteClusters.remove(clusterAlias);
127136
} else {
@@ -445,11 +454,4 @@ protected void getRemoteConnection(String cluster, ActionListener<Transport.Conn
445454
listener
446455
);
447456
}
448-
449-
public Map<String, List<String>> groupIndicesPerCluster(Set<String> remoteClusterNames, String[] indices) {
450-
return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices)
451-
.entrySet()
452-
.stream()
453-
.collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices())));
454-
}
455457
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.OriginalIndices;
12-
import org.elasticsearch.action.search.ShardSearchFailure;
1312
import org.elasticsearch.action.support.IndicesOptions;
1413
import org.elasticsearch.action.support.SubscribableListener;
1514
import org.elasticsearch.common.Strings;
@@ -19,7 +18,6 @@
1918
import org.elasticsearch.compute.data.Page;
2019
import org.elasticsearch.compute.operator.DriverProfile;
2120
import org.elasticsearch.core.Releasables;
22-
import org.elasticsearch.core.TimeValue;
2321
import org.elasticsearch.index.IndexMode;
2422
import org.elasticsearch.index.query.QueryBuilder;
2523
import org.elasticsearch.indices.IndicesExpressionGrouper;
@@ -362,16 +360,12 @@ public void analyzedPlan(
362360
final List<IndexPattern> indices = preAnalysis.indices;
363361

364362
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState());
363+
initializeClusterData(indices, executionInfo);
365364

366-
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
367-
configuredClusters,
368-
indices.stream()
369-
.flatMap(index -> Arrays.stream(Strings.commaDelimitedListToStringArray(index.indexPattern())))
370-
.toArray(String[]::new)
371-
).keySet();
365+
final Set<String> targetClusters = executionInfo.getClusters().keySet();
372366

373367
var listener = SubscribableListener.<EnrichResolution>newForked(
374-
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)
368+
l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, executionInfo, l)
375369
)
376370
.<PreAnalysisResult>andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l))
377371
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l));
@@ -432,6 +426,26 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result,
432426
// TODO: Verify that the resolved index actually has indexMode: "lookup"
433427
}
434428

429+
private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {
430+
if (indices.isEmpty()) {
431+
return;
432+
}
433+
assert indices.size() == 1 : "Only single index pattern is supported";
434+
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
435+
configuredClusters,
436+
IndicesOptions.DEFAULT,
437+
indices.getFirst().indexPattern()
438+
);
439+
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
440+
final String clusterAlias = entry.getKey();
441+
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
442+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
443+
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
444+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
445+
});
446+
}
447+
}
448+
435449
private void preAnalyzeIndices(
436450
List<IndexPattern> indices,
437451
EsqlExecutionInfo executionInfo,
@@ -444,38 +458,8 @@ private void preAnalyzeIndices(
444458
// Note: JOINs are not supported but we detect them when
445459
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
446460
} else if (indices.size() == 1) {
447-
// known to be unavailable from the enrich policy API call
448-
Map<String, Exception> unavailableClusters = result.enrichResolution.getUnavailableClusters();
449461
IndexPattern table = indices.getFirst();
450462

451-
Map<String, OriginalIndices> clusterIndices = indicesExpressionGrouper.groupIndices(
452-
configuredClusters,
453-
IndicesOptions.DEFAULT,
454-
table.indexPattern()
455-
);
456-
for (Map.Entry<String, OriginalIndices> entry : clusterIndices.entrySet()) {
457-
final String clusterAlias = entry.getKey();
458-
String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
459-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
460-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
461-
if (unavailableClusters.containsKey(k)) {
462-
return new EsqlExecutionInfo.Cluster(
463-
clusterAlias,
464-
indexExpr,
465-
executionInfo.isSkipUnavailable(clusterAlias),
466-
EsqlExecutionInfo.Cluster.Status.SKIPPED,
467-
0,
468-
0,
469-
0,
470-
0,
471-
List.of(new ShardSearchFailure(unavailableClusters.get(k))),
472-
new TimeValue(0)
473-
);
474-
} else {
475-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
476-
}
477-
});
478-
}
479463
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
480464
// based only on available clusters (which could now be an empty list)
481465
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.transport.TransportService;
4040
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
4141
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
42+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4243
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
4344
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
4445
import org.elasticsearch.xpack.esql.session.IndexResolver;
@@ -429,6 +430,7 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
429430

430431
EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
431432
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
433+
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
432434
if (randomBoolean()) {
433435
unresolvedPolicies = new ArrayList<>(unresolvedPolicies);
434436
for (Enrich.Mode mode : Enrich.Mode.values()) {
@@ -442,7 +444,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
442444
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
443445
}
444446
}
445-
super.resolvePolicies(clusters, unresolvedPolicies, future);
447+
super.resolvePolicies(clusters, unresolvedPolicies, esqlExecutionInfo, future);
446448
return future.actionGet(30, TimeUnit.SECONDS);
447449
}
448450

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ EnrichPolicyResolver mockEnrichResolver() {
8383
ActionListener<EnrichResolution> listener = (ActionListener<EnrichResolution>) arguments[arguments.length - 1];
8484
listener.onResponse(new EnrichResolution());
8585
return null;
86-
}).when(enrichResolver).resolvePolicies(any(), any(), any());
86+
}).when(enrichResolver).resolvePolicies(any(), any(), any(), any());
8787
return enrichResolver;
8888
}
8989

0 commit comments

Comments
 (0)