From dacf28295d6f9fa235a0d2b6ddadccc542dade4c Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 1 Sep 2025 13:06:34 +0200 Subject: [PATCH 1/3] Refactor enrich policy resolution --- .../esql/enrich/EnrichPolicyResolver.java | 24 ++++++++++++++----- .../xpack/esql/session/EsqlSession.java | 4 +--- .../enrich/EnrichPolicyResolverTests.java | 5 ++-- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 838e305db52ea..49d5f6bba7d20 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -110,11 +110,26 @@ public static UnresolvedPolicy from(Enrich e) { /** * Resolves a set of enrich policies * - * @param unresolvedPolicies the unresolved policies + * @param enriches the unresolved policies * @param executionInfo the execution info * @param listener notified with the enrich resolution */ - public void resolvePolicies( + public void resolvePolicies(List enriches, EsqlExecutionInfo executionInfo, ActionListener listener) { + if (enriches.isEmpty()) { + listener.onResponse(new EnrichResolution()); + return; + } + + doResolvePolicies( + new HashSet<>(executionInfo.getClusters().keySet()), + enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), + executionInfo, + listener + ); + } + + protected void doResolvePolicies( + Set remoteClusters, Collection unresolvedPolicies, EsqlExecutionInfo executionInfo, ActionListener listener @@ -124,13 +139,10 @@ public void resolvePolicies( return; } - final Set remoteClusters = new HashSet<>(executionInfo.getClusters().keySet()); final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); - - Map lookupResponsesToProcess = new HashMap<>(); - + final Map lookupResponsesToProcess = new HashMap<>(); for (Map.Entry entry : lookupResponses.entrySet()) { String clusterAlias = entry.getKey(); if (entry.getValue().connectionError != null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b28723d121aea..2576d82d42819 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -373,12 +373,10 @@ public void analyzedPlan( } PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); - var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet()); - EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo); var listener = SubscribableListener. // - newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l)) + newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l)) .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) .andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l)); // first resolve the lookup indices, then the main indices diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 241be34c96a57..c842d7beee6f5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -434,7 +435,6 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { } EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { - PlainActionFuture future = new PlainActionFuture<>(); EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true); for (String cluster : clusters) { esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*")); @@ -452,7 +452,8 @@ EnrichResolution resolvePolicies(Collection clusters, Collection future = new PlainActionFuture<>(); + super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future); return future.actionGet(30, TimeUnit.SECONDS); } From 608a2260a6414ca4239d1a32f979383cf41383d8 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 1 Sep 2025 14:45:39 +0200 Subject: [PATCH 2/3] reorder resolveAsMergedMapping parameters --- .../xpack/esql/enrich/EnrichPolicyResolver.java | 4 ++-- .../org/elasticsearch/xpack/esql/session/EsqlSession.java | 8 ++++---- .../elasticsearch/xpack/esql/session/IndexResolver.java | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 49d5f6bba7d20..21f7a8b976510 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -446,7 +446,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas } try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) { String indexName = EnrichPolicy.getBaseName(policyName); - indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, refs.acquire(indexResult -> { + indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> { if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) { EsIndex esIndex = indexResult.get(); var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0)); @@ -461,7 +461,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas } else { failures.put(policyName, indexResult.toString()); } - }), false); + })); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 2576d82d42819..555fab6df0c47 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -422,8 +422,8 @@ private void preAnalyzeLookupIndex( patternWithRemotes, fieldNames, null, - listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)), - false + false, + listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) ); } @@ -653,10 +653,10 @@ private void preAnalyzeMainIndices( indexExpressionToResolve, result.fieldNames, requestFilter, + includeAllDimensions, listener.delegateFailure((l, indexResolution) -> { l.onResponse(result.withIndexResolution(indexResolution)); - }), - includeAllDimensions + }) ); } } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index a9da7741af400..5f42d261abaa7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -81,8 +81,8 @@ public void resolveAsMergedMapping( String indexWildcard, Set fieldNames, QueryBuilder requestFilter, - ActionListener listener, - boolean includeAllDimensions + boolean includeAllDimensions, + ActionListener listener ) { client.execute( EsqlResolveFieldsAction.TYPE, From de1f6a61f2748d31f1a209578b2e4a45f3647a85 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 1 Sep 2025 14:51:22 +0200 Subject: [PATCH 3/3] simplifications --- .../xpack/esql/enrich/EnrichPolicyResolver.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 21f7a8b976510..d9314f1a0611d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -436,9 +436,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas new ChannelActionListener<>(channel), threadContext ); - try ( - RefCountingListener refs = new RefCountingListener(listener.map(unused -> new LookupResponse(resolvedPolices, failures))) - ) { + try (var refs = new RefCountingListener(listener.map(unused -> new LookupResponse(resolvedPolices, failures)))) { for (String policyName : request.policyNames) { EnrichPolicy p = availablePolicies.get(policyName); if (p == null) { @@ -469,9 +467,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas } protected Map availablePolicies() { - final EnrichMetadata metadata = projectResolver.getProjectMetadata(clusterService.state()) - .custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY); - return metadata.getPolicies(); + return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies(); } protected void getRemoteConnection(String cluster, ActionListener listener) {