diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesExpressionGrouper.java b/server/src/main/java/org/elasticsearch/indices/IndicesExpressionGrouper.java index 096ac0912d531..9660c57260529 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesExpressionGrouper.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesExpressionGrouper.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings; import java.util.Map; +import java.util.Set; /** * Interface for grouping index expressions, along with IndicesOptions by cluster alias. @@ -29,23 +30,36 @@ public interface IndicesExpressionGrouper { /** + * @param remoteClusterNames Set of configured remote cluster names. * @param indicesOptions IndicesOptions to clarify how the index expression should be parsed/applied * @param indexExpressionCsv Multiple index expressions as CSV string (with no spaces), e.g., "logs1,logs2,cluster-a:logs1". * A single index expression is also supported. * @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) * and the value for that cluster from the index expression is an OriginalIndices object. */ - default Map groupIndices(IndicesOptions indicesOptions, String indexExpressionCsv) { - return groupIndices(indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv)); + default Map groupIndices( + Set remoteClusterNames, + IndicesOptions indicesOptions, + String indexExpressionCsv + ) { + return groupIndices(remoteClusterNames, indicesOptions, Strings.splitStringByCommaToArray(indexExpressionCsv)); } /** * Same behavior as the other groupIndices, except the incoming multiple index expressions must already be * parsed into a String array. + * @param remoteClusterNames Set of configured remote cluster names. * @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied * @param indexExpressions Multiple index expressions as string[]. * @return Map where the key is the cluster alias (for "local" cluster, it is RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) * and the value for that cluster from the index expression is an OriginalIndices object. */ - Map groupIndices(IndicesOptions indicesOptions, String[] indexExpressions); + Map groupIndices(Set remoteClusterNames, IndicesOptions indicesOptions, String[] indexExpressions); + + /** + * Returns a set of currently configured remote clusters. + */ + default Set getConfiguredClusters() { + return Set.of(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 60c295ccb4f5a..1ecaf8f4b15f1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -185,6 +185,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no /** * Group indices by cluster alias mapped to OriginalIndices for that cluster. + * @param remoteClusterNames Set of configured remote cluster names. * @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied * @param indices Multiple index expressions as string[]. * @param returnLocalAll whether to support the _all functionality needed by _search @@ -193,9 +194,14 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no * If false, an empty map is returned when no indices are specified. * @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument */ - public Map groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) { + public Map groupIndices( + Set remoteClusterNames, + IndicesOptions indicesOptions, + String[] indices, + boolean returnLocalAll + ) { final Map originalIndicesMap = new HashMap<>(); - final Map> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices); + final Map> groupedIndices = groupClusterIndices(remoteClusterNames, indices); if (groupedIndices.isEmpty()) { if (returnLocalAll) { // search on _all in the local cluster if neither local indices nor remote indices were specified @@ -214,12 +220,26 @@ public Map groupIndices(IndicesOptions indicesOptions, /** * If no indices are specified, then a Map with one entry for the local cluster with an empty index array is returned. * For details see {@code groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll)} + * @param remoteClusterNames Set of configured remote cluster names. * @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied * @param indices Multiple index expressions as string[]. * @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument */ + public Map groupIndices(Set remoteClusterNames, IndicesOptions indicesOptions, String[] indices) { + return groupIndices(remoteClusterNames, indicesOptions, indices, true); + } + + public Map groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) { + return groupIndices(getRemoteClusterNames(), indicesOptions, indices, returnLocalAll); + } + public Map groupIndices(IndicesOptions indicesOptions, String[] indices) { - return groupIndices(indicesOptions, indices, true); + return groupIndices(getRemoteClusterNames(), indicesOptions, indices, true); + } + + @Override + public Set getConfiguredClusters() { + return getRemoteClusterNames(); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 4de5a2e3661fb..b4583cb676f98 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -446,8 +446,8 @@ protected void getRemoteConnection(String cluster, ActionListener> groupIndicesPerCluster(String[] indices) { - return remoteClusterService.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, indices) + public Map> groupIndicesPerCluster(Set remoteClusterNames, String[] indices) { + return remoteClusterService.groupIndices(remoteClusterNames, SearchRequest.DEFAULT_INDICES_OPTIONS, indices) .entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue().indices()))); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 3a2db609d6c8a..3c50a75bfafb5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -278,12 +278,13 @@ public static void checkForCcsLicense( EsqlExecutionInfo executionInfo, List indices, IndicesExpressionGrouper indicesGrouper, + Set configuredClusters, XPackLicenseState licenseState ) { for (TableInfo tableInfo : indices) { Map groupedIndices; try { - groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, tableInfo.id().indexPattern()); + groupedIndices = indicesGrouper.groupIndices(configuredClusters, IndicesOptions.DEFAULT, tableInfo.id().indexPattern()); } catch (NoSuchRemoteClusterException e) { if (EsqlLicenseChecker.isCcsAllowed(licenseState)) { throw e; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 2efed443c9aa9..6bfd21c1471eb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -120,6 +120,7 @@ public interface PlanRunner { private final PhysicalPlanOptimizer physicalPlanOptimizer; private final PlanTelemetry planTelemetry; private final IndicesExpressionGrouper indicesExpressionGrouper; + private Set configuredClusters; public EsqlSession( String sessionId, @@ -343,6 +344,8 @@ public void analyzedPlan( plan.setAnalyzed(); return plan; }; + // Capture configured remotes list to ensure consistency throughout the session + configuredClusters = Set.copyOf(indicesExpressionGrouper.getConfiguredClusters()); PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); var unresolvedPolicies = preAnalysis.enriches.stream() @@ -355,9 +358,10 @@ public void analyzedPlan( .collect(Collectors.toSet()); final List indices = preAnalysis.indices; - EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState()); + EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, configuredClusters, verifier.licenseState()); final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( + configuredClusters, indices.stream() .flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().indexPattern()))) .toArray(String[]::new) @@ -378,7 +382,7 @@ public void analyzedPlan( // invalid index resolution to updateExecutionInfo if (result.indices.isValid()) { // CCS indices and skip_unavailable cluster values can stop the analysis right here - if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, result, logicalPlanListener, l)) return; + if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; } // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step l.onResponse(result); @@ -442,6 +446,7 @@ private void preAnalyzeIndices( IndexPattern table = tableInfo.id(); Map clusterIndices = indicesExpressionGrouper.groupIndices( + configuredClusters, IndicesOptions.DEFAULT, table.indexPattern() ); @@ -501,13 +506,14 @@ private void preAnalyzeIndices( } } - private boolean analyzeCCSIndices( + /** + * Check if there are any clusters to search. + * @return true if there are no clusters to search, false otherwise + */ + private boolean allCCSClustersSkipped( EsqlExecutionInfo executionInfo, - Set targetClusters, - Set unresolvedPolicies, PreAnalysisResult result, - ActionListener logicalPlanListener, - ActionListener l + ActionListener logicalPlanListener ) { IndexResolution indexResolution = result.indices; EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -520,22 +526,6 @@ private boolean analyzeCCSIndices( return true; } - Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( - indexResolution.get().concreteIndices().toArray(String[]::new) - ).keySet(); - // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again - // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. - // TODO: add a test for this - if (targetClusters.containsAll(newClusters) == false - // do not bother with a re-resolution if only remotes were requested and all were offline - && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isPresent()) { - enrichPolicyResolver.resolvePolicies( - newClusters, - unresolvedPolicies, - l.map(enrichResolution -> result.withEnrichResolution(enrichResolution)) - ); - return true; - } return false; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index e151e4c8f3a9b..1afc44540867b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -712,22 +712,22 @@ public void testCheckForCcsLicense() { List indices = new ArrayList<>(); indices.add(new TableInfo(new IndexPattern(EMPTY, randomFrom("idx", "idx1,idx2*")))); - checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicense); - checkForCcsLicense(executionInfo, indices, indicesGrouper, nullLicense); - - checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseInactive); - checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseInactive); - checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseInactive); - checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseInactive); - checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseInactive); - checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseInactive); - checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), platinumLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), goldLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), basicLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), standardLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), missingLicense); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), nullLicense); + + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), platinumLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), goldLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), basicLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), standardLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), missingLicenseInactive); } // cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license @@ -742,8 +742,8 @@ public void testCheckForCcsLicense() { } // licenses that work - checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid); - checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), enterpriseLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), trialLicenseValid); // all others fail --- @@ -812,7 +812,7 @@ private void assertLicenseCheckFails( EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, - () -> checkForCcsLicense(executionInfo, indices, indicesGrouper, licenseState) + () -> checkForCcsLicense(executionInfo, indices, indicesGrouper, Set.of(), licenseState) ); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat( @@ -825,7 +825,11 @@ private void assertLicenseCheckFails( static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper { @Override - public Map groupIndices(IndicesOptions indicesOptions, String[] indexExpressions) { + public Map groupIndices( + Set remoteClusterNames, + IndicesOptions indicesOptions, + String[] indexExpressions + ) { final Map originalIndicesMap = new HashMap<>(); final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index 8f4b2ea757327..a85dfe6677fc7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -157,7 +157,7 @@ public void testFailedMetric() { // test a failed query: xyz field doesn't exist request.query("from test | stats m = max(xyz)"); EsqlSession.PlanRunner runPhase = (p, r) -> fail("this shouldn't happen"); - IndicesExpressionGrouper groupIndicesByCluster = (indicesOptions, indexExpressions) -> Map.of( + IndicesExpressionGrouper groupIndicesByCluster = (remoteClusterNames, indicesOptions, indexExpressions) -> Map.of( "", new OriginalIndices(new String[] { "test" }, IndicesOptions.DEFAULT) );