From d4dbff9a70bcaa5fc2220d78c8f35710d56891ba Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 7 Apr 2025 12:13:23 -0600 Subject: [PATCH 1/5] Refactor remote cluster handling in Analyzer - Initialize clusters earlier - Simplify cluster set calculation - No need to keep separate skipped list for enrich resolution --- .../xpack/esql/analysis/EnrichResolution.java | 9 --- .../esql/enrich/EnrichPolicyResolver.java | 22 ++++--- .../xpack/esql/session/EsqlSession.java | 62 +++++++------------ .../enrich/EnrichPolicyResolverTests.java | 4 +- .../telemetry/PlanExecutorMetricsTests.java | 2 +- 5 files changed, 39 insertions(+), 60 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 4f6886edc5fbc..7fb279f18b1dc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -23,7 +23,6 @@ public final class EnrichResolution { private final Map resolvedPolicies = ConcurrentCollections.newConcurrentMap(); private final Map errors = ConcurrentCollections.newConcurrentMap(); - private final Map unavailableClusters = ConcurrentCollections.newConcurrentMap(); public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) { return resolvedPolicies.get(new Key(policyName, mode)); @@ -52,14 +51,6 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } - public void addUnavailableCluster(String clusterAlias, Exception e) { - unavailableClusters.put(clusterAlias, e); - } - - public Map getUnavailableClusters() { - return unavailableClusters; - } - private record Key(String policyName, Enrich.Mode mode) { } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index b4583cb676f98..a42187a3ed88a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -10,7 +10,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.RefCountingListener; @@ -37,6 +36,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.enrich.EnrichMetadata; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; @@ -49,7 +49,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -59,6 +58,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards; + /** * Resolves enrich policies across clusters in several steps: * 1. Calculates the policies that need to be resolved for each cluster, see {@link #lookupPolicies}. @@ -105,6 +106,7 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { public void resolvePolicies( Collection targetClusters, Collection unresolvedPolicies, + EsqlExecutionInfo executionInfo, ActionListener listener ) { if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) { @@ -121,7 +123,14 @@ public void resolvePolicies( for (Map.Entry entry : lookupResponses.entrySet()) { String clusterAlias = entry.getKey(); if (entry.getValue().connectionError != null) { - enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); + assert clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false + : "Should never have a connection error for the local cluster"; + markClusterWithFinalStateAndNoShards( + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.SKIPPED, + entry.getValue().connectionError + ); // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy remoteClusters.remove(clusterAlias); } else { @@ -445,11 +454,4 @@ protected void getRemoteConnection(String cluster, ActionListener> groupIndicesPerCluster(Set remoteClusterNames, String[] indices) { - return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices) - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices()))); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b218435b03c36..06f59d3e43d70 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; @@ -19,7 +18,6 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; @@ -362,16 +360,12 @@ public void analyzedPlan( final List indices = preAnalysis.indices; EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState()); + initializeClusterData(indices, executionInfo); - final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( - configuredClusters, - indices.stream() - .flatMap(index -> Arrays.stream(Strings.commaDelimitedListToStringArray(index.indexPattern()))) - .toArray(String[]::new) - ).keySet(); + final Set targetClusters = executionInfo.getClusters().keySet(); var listener = SubscribableListener.newForked( - l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l) + l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, executionInfo, l) ) .andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)) .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); @@ -432,6 +426,26 @@ private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, // TODO: Verify that the resolved index actually has indexMode: "lookup" } + private void initializeClusterData(List indices, EsqlExecutionInfo executionInfo) { + if (indices.isEmpty()) { + return; + } + assert indices.size() == 1 : "Only single index pattern is supported"; + Map clusterIndices = indicesExpressionGrouper.groupIndices( + configuredClusters, + IndicesOptions.DEFAULT, + indices.getFirst().indexPattern() + ); + for (Map.Entry entry : clusterIndices.entrySet()) { + final String clusterAlias = entry.getKey(); + String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); + }); + } + } + private void preAnalyzeIndices( List indices, EsqlExecutionInfo executionInfo, @@ -444,38 +458,8 @@ private void preAnalyzeIndices( // Note: JOINs are not supported but we detect them when listener.onFailure(new MappingException("Queries with multiple indices are not supported")); } else if (indices.size() == 1) { - // known to be unavailable from the enrich policy API call - Map unavailableClusters = result.enrichResolution.getUnavailableClusters(); IndexPattern table = indices.getFirst(); - Map clusterIndices = indicesExpressionGrouper.groupIndices( - configuredClusters, - IndicesOptions.DEFAULT, - table.indexPattern() - ); - for (Map.Entry entry : clusterIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - if (unavailableClusters.containsKey(k)) { - return new EsqlExecutionInfo.Cluster( - clusterAlias, - indexExpr, - executionInfo.isSkipUnavailable(clusterAlias), - EsqlExecutionInfo.Cluster.Status.SKIPPED, - 0, - 0, - 0, - 0, - List.of(new ShardSearchFailure(unavailableClusters.get(k))), - new TimeValue(0) - ); - } else { - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); - } - }); - } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 455a02ca8f2e5..4e350fcd4bf9c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.EnrichMetadata; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.session.IndexResolver; @@ -429,6 +430,7 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { PlainActionFuture future = new PlainActionFuture<>(); + EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true); if (randomBoolean()) { unresolvedPolicies = new ArrayList<>(unresolvedPolicies); for (Enrich.Mode mode : Enrich.Mode.values()) { @@ -442,7 +444,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection listener = (ActionListener) arguments[arguments.length - 1]; listener.onResponse(new EnrichResolution()); return null; - }).when(enrichResolver).resolvePolicies(any(), any(), any()); + }).when(enrichResolver).resolvePolicies(any(), any(), any(), any()); return enrichResolver; } From d9528e38a4fbe71336a54e591eb7a0badae063f4 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 7 Apr 2025 18:22:54 +0000 Subject: [PATCH 2/5] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 06f59d3e43d70..780267f443641 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -80,7 +80,6 @@ import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; From 89c781c607b9ed449ab453d47c5242ddf6b700a8 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 7 Apr 2025 17:05:40 -0600 Subject: [PATCH 3/5] fix enrich targets --- .../xpack/esql/session/EsqlSession.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 780267f443641..d4fb2a060dd32 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -23,6 +23,7 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; @@ -81,6 +82,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -361,10 +363,8 @@ public void analyzedPlan( EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState()); initializeClusterData(indices, executionInfo); - final Set targetClusters = executionInfo.getClusters().keySet(); - var listener = SubscribableListener.newForked( - l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, executionInfo, l) + l -> enrichPolicyResolver.resolvePolicies(getEnrichTargets(executionInfo), unresolvedPolicies, executionInfo, l) ) .andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)) .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); @@ -413,6 +413,15 @@ public void analyzedPlan( }).addListener(logicalPlanListener); } + private static Set getEnrichTargets(EsqlExecutionInfo executionInfo) { + Set targetClusters = executionInfo.getClusters().keySet(); + if (targetClusters.isEmpty()) { + // Always include local cluster for enrich resolution + return Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + } + return targetClusters; + } + private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, ActionListener listener) { Set fieldNames = result.wildcardJoinIndices().contains(table.indexPattern()) ? IndexResolver.ALL_FIELDS : result.fieldNames; // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types From 6b795c1389f8b595bd8eff4930729ff35cfd5ed1 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 7 Apr 2025 23:18:23 +0000 Subject: [PATCH 4/5] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index d4fb2a060dd32..9f82e0405d828 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -82,7 +82,6 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; From 1755ea8a8d977caec0474acbd2f47ce301ad12ab Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 7 Apr 2025 19:51:16 -0600 Subject: [PATCH 5/5] More refactoring --- .../esql/enrich/EnrichPolicyResolver.java | 10 +++++----- .../xpack/esql/session/EsqlSession.java | 18 +----------------- .../esql/enrich/EnrichPolicyResolverTests.java | 5 ++++- .../telemetry/PlanExecutorMetricsTests.java | 2 +- 4 files changed, 11 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index a42187a3ed88a..9449e6fe48228 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -99,22 +99,22 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { /** * Resolves a set of enrich policies * - * @param targetClusters the target clusters * @param unresolvedPolicies the unresolved policies + * @param executionInfo the execution info * @param listener notified with the enrich resolution */ public void resolvePolicies( - Collection targetClusters, Collection unresolvedPolicies, EsqlExecutionInfo executionInfo, ActionListener listener ) { - if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) { + if (unresolvedPolicies.isEmpty()) { listener.onResponse(new EnrichResolution()); return; } - final Set remoteClusters = new HashSet<>(targetClusters); - final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + + final Set remoteClusters = new HashSet<>(executionInfo.getClusters().keySet()); + final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 9f82e0405d828..cee9f03e72e1f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -23,7 +23,6 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; @@ -363,7 +362,7 @@ public void analyzedPlan( initializeClusterData(indices, executionInfo); var listener = SubscribableListener.newForked( - l -> enrichPolicyResolver.resolvePolicies(getEnrichTargets(executionInfo), unresolvedPolicies, executionInfo, l) + l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l) ) .andThen((l, enrichResolution) -> resolveFieldNames(parsed, enrichResolution, l)) .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); @@ -389,12 +388,6 @@ public void analyzedPlan( }).andThen((l, result) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; - // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices - // resolving one more time (the first attempt failed and the query had a filter) - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> null); - } - // here the requestFilter is set to null, performing the pre-analysis after the first step failed preAnalyzeIndices(preAnalysis.indices, executionInfo, result, null, l); }).andThen((l, result) -> { @@ -412,15 +405,6 @@ public void analyzedPlan( }).addListener(logicalPlanListener); } - private static Set getEnrichTargets(EsqlExecutionInfo executionInfo) { - Set targetClusters = executionInfo.getClusters().keySet(); - if (targetClusters.isEmpty()) { - // Always include local cluster for enrich resolution - return Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - } - return targetClusters; - } - private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, ActionListener listener) { Set fieldNames = result.wildcardJoinIndices().contains(table.indexPattern()) ? IndexResolver.ALL_FIELDS : result.fieldNames; // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 4e350fcd4bf9c..b09195319c535 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -431,6 +431,9 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { PlainActionFuture future = new PlainActionFuture<>(); EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true); + for (String cluster : clusters) { + esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*")); + } if (randomBoolean()) { unresolvedPolicies = new ArrayList<>(unresolvedPolicies); for (Enrich.Mode mode : Enrich.Mode.values()) { @@ -444,7 +447,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection listener = (ActionListener) arguments[arguments.length - 1]; listener.onResponse(new EnrichResolution()); return null; - }).when(enrichResolver).resolvePolicies(any(), any(), any(), any()); + }).when(enrichResolver).resolvePolicies(any(), any(), any()); return enrichResolver; }