diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index ff47fc4121b0f..3ad3e1e92f5a2 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -132,6 +132,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.function.Function; import java.util.jar.JarInputStream; import java.util.zip.ZipEntry; @@ -548,23 +549,17 @@ public static List> getValuesList(EsqlQueryResponse results) { } public static List> getValuesList(Iterator> values) { - var valuesList = new ArrayList>(); - values.forEachRemaining(row -> { - var rowValues = new ArrayList<>(); - row.forEachRemaining(rowValues::add); - valuesList.add(rowValues); - }); - return valuesList; + return toList(values, row -> toList(row, Function.identity())); } public static List> getValuesList(Iterable> values) { - var valuesList = new ArrayList>(); - values.iterator().forEachRemaining(row -> { - var rowValues = new ArrayList<>(); - row.iterator().forEachRemaining(rowValues::add); - valuesList.add(rowValues); - }); - return valuesList; + return toList(values.iterator(), row -> toList(row.iterator(), Function.identity())); + } + + private static List toList(Iterator iterable, Function transformer) { + var list = new ArrayList(); + iterable.forEachRemaining(e -> list.add(transformer.apply(e))); + return list; } public static List withDefaultLimitWarning(List warnings) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java index f848c663d3ca6..fea9e1b6ab735 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java @@ -413,7 +413,8 @@ public void testLimitThenEnrichRemote() { FROM *:events,events | LIMIT 25 | eval ip= TO_STR(host) - | %s | KEEP host, timestamp, user, os + | %s + | KEEP host, timestamp, user, os """, enrichHosts(Enrich.Mode.REMOTE)); try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { var values = getValuesList(resp); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java index 90b0124862231..3c9c7ea39f503 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java @@ -22,7 +22,7 @@ */ public class PreAnalyzer { - public record PreAnalysis(IndexMode indexMode, IndexPattern index, List enriches, List lookupIndices) { + public record PreAnalysis(IndexMode indexMode, IndexPattern indexPattern, List enriches, List lookupIndices) { public static final PreAnalysis EMPTY = new PreAnalysis(null, null, List.of(), List.of()); } @@ -37,7 +37,7 @@ public PreAnalysis preAnalyze(LogicalPlan plan) { protected PreAnalysis doPreAnalyze(LogicalPlan plan) { Holder indexMode = new Holder<>(); - Holder index = new Holder<>(); + Holder indexPattern = new Holder<>(); List unresolvedEnriches = new ArrayList<>(); List lookupIndices = new ArrayList<>(); @@ -47,7 +47,7 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) { lookupIndices.add(p.indexPattern()); } else if (indexMode.get() == null || indexMode.get() == p.indexMode()) { indexMode.set(p.indexMode()); - index.set(p.indexPattern()); + indexPattern.set(p.indexPattern()); } else { throw new IllegalStateException("index mode is already set"); } @@ -58,6 +58,6 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) { // mark plan as preAnalyzed (if it were marked, there would be no analysis) plan.forEachUp(LogicalPlan::setPreAnalyzed); - return new PreAnalysis(indexMode.get(), index.get(), unresolvedEnriches, lookupIndices); + return new PreAnalysis(indexMode.get(), indexPattern.get(), unresolvedEnriches, lookupIndices); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index d9314f1a0611d..06d9ca7d64d20 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; @@ -39,13 +40,16 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.analysis.PreAnalyzer; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.core.util.StringUtils; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import org.elasticsearch.xpack.esql.session.IndexResolver; import java.io.IOException; @@ -53,7 +57,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -110,22 +113,33 @@ public static UnresolvedPolicy from(Enrich e) { /** * Resolves a set of enrich policies * - * @param enriches the unresolved policies + * @param preAnalysis to retrieve indices and enriches to resolve + * @param requestFilter to resolve target clusters * @param executionInfo the execution info * @param listener notified with the enrich resolution */ - public void resolvePolicies(List enriches, EsqlExecutionInfo executionInfo, ActionListener listener) { - if (enriches.isEmpty()) { + public void resolvePolicies( + PreAnalyzer.PreAnalysis preAnalysis, + QueryBuilder requestFilter, + EsqlExecutionInfo executionInfo, + ActionListener listener + ) { + if (preAnalysis.enriches().isEmpty()) { listener.onResponse(new EnrichResolution()); return; } - doResolvePolicies( - new HashSet<>(executionInfo.getClusters().keySet()), - enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), - executionInfo, - listener - ); + doResolveRemotes(preAnalysis.indexPattern(), requestFilter, listener.delegateFailureAndWrap((l, remotes) -> { + doResolvePolicies(remotes, preAnalysis.enriches().stream().map(UnresolvedPolicy::from).toList(), executionInfo, l); + })); + } + + private void doResolveRemotes(IndexPattern indexPattern, QueryBuilder requestFilter, ActionListener> listener) { + if (indexPattern != null) { + indexResolver.resolveConcreteIndices(indexPattern.indexPattern(), requestFilter, listener.map(EsqlCCSUtils::getRemotesOf)); + } else { + listener.onResponse(Set.of()); + } } protected void doResolvePolicies( @@ -442,7 +456,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas if (p == null) { continue; } - try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) { + try (var ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) { String indexName = EnrichPolicy.getBaseName(policyName); indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> { if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index a86c0414468ad..06e3a2c562e95 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -40,7 +40,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toSet; public class EsqlCCSUtils { @@ -206,7 +207,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( // NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters. final Set clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING) .map(Cluster::getClusterAlias) - .collect(Collectors.toSet()); + .collect(toSet()); for (String indexName : indexResolution.resolvedIndices()) { clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName)); } @@ -322,10 +323,10 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { public static void initCrossClusterState( IndicesExpressionGrouper indicesGrouper, XPackLicenseState licenseState, - IndexPattern pattern, + IndexPattern indexPattern, EsqlExecutionInfo executionInfo ) throws ElasticsearchStatusException { - if (pattern == null) { + if (indexPattern == null) { return; } try { @@ -334,7 +335,7 @@ public static void initCrossClusterState( // it is copied here so that we have the same resolution when request contains multiple remote cluster patterns with * Set.copyOf(indicesGrouper.getConfiguredClusters()), IndicesOptions.DEFAULT, - pattern.indexPattern() + indexPattern.indexPattern() ); executionInfo.clusterInfoInitializing(true); @@ -413,4 +414,8 @@ public static String inClusterName(String clusterAlias) { return "in remote cluster [" + clusterAlias + "]"; } } + + public static Set getRemotesOf(Set concreteIndices) { + return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet()); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a8078feec4f8b..e4c2d1f362fb8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -373,10 +373,10 @@ public void analyzedPlan( } var preAnalysis = preAnalyzer.preAnalyze(parsed); - EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.index(), executionInfo); + EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); SubscribableListener. // - newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l)) + newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis, requestFilter, executionInfo, l)) .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) .andThen((l, r) -> resolveInferences(parsed, r, l)) .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) @@ -632,12 +632,12 @@ private void preAnalyzeMainIndices( ThreadPool.Names.SEARCH_COORDINATION, ThreadPool.Names.SYSTEM_READ ); - if (preAnalysis.index() != null) { + if (preAnalysis.indexPattern() != null) { String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( - result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of()))) + result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of()))) ); } else { boolean includeAllDimensions = false; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index 5f42d261abaa7..7e642a71778ab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -42,6 +42,7 @@ import java.util.TreeMap; import java.util.TreeSet; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD; import static org.elasticsearch.xpack.esql.core.type.DataType.OBJECT; @@ -74,6 +75,16 @@ public IndexResolver(Client client) { this.client = client; } + public void resolveConcreteIndices(String indexPattern, QueryBuilder requestFilter, ActionListener> listener) { + client.execute( + EsqlResolveFieldsAction.TYPE, + createFieldCapsRequest(indexPattern, Set.of("_id"), requestFilter, false), + listener.delegateFailureAndWrap((l, response) -> { + l.onResponse(response.getIndexResponses().stream().map(FieldCapabilitiesIndexResponse::getIndexName).collect(toSet())); + }) + ); + } + /** * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. */ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 2ff6ce71be516..a993b3a5f4420 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -530,12 +530,12 @@ private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiInd private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) { var preAnalysis = new PreAnalyzer().preAnalyze(parsed); - if (preAnalysis.index() == null) { + if (preAnalysis.indexPattern() == null) { // If the data set doesn't matter we'll just grab one we know works. Employees is fine. return CsvTestsDataLoader.MultiIndexTestDataset.of(CSV_DATASET_MAP.get("employees")); } - String indexName = preAnalysis.index().indexPattern(); + String indexName = preAnalysis.indexPattern().indexPattern(); List datasets = new ArrayList<>(); if (indexName.endsWith("*")) { String indexPrefix = indexName.substring(0, indexName.length() - 1); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index 752e61c240cd5..a7891f7ee3776 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -83,7 +83,7 @@ EnrichPolicyResolver mockEnrichResolver() { ActionListener listener = (ActionListener) arguments[arguments.length - 1]; listener.onResponse(new EnrichResolution()); return null; - }).when(enrichResolver).resolvePolicies(any(), any(), any()); + }).when(enrichResolver).resolvePolicies(any(), any(), any(), any()); return enrichResolver; }