Skip to content

Commit dacf282

Browse files
committed
Refactor enrich policy resolution
1 parent 662882a commit dacf282

File tree

3 files changed

+22
-11
lines changed

3 files changed

+22
-11
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,26 @@ public static UnresolvedPolicy from(Enrich e) {
110110
/**
111111
* Resolves a set of enrich policies
112112
*
113-
* @param unresolvedPolicies the unresolved policies
113+
* @param enriches the unresolved policies
114114
* @param executionInfo the execution info
115115
* @param listener notified with the enrich resolution
116116
*/
117-
public void resolvePolicies(
117+
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
118+
if (enriches.isEmpty()) {
119+
listener.onResponse(new EnrichResolution());
120+
return;
121+
}
122+
123+
doResolvePolicies(
124+
new HashSet<>(executionInfo.getClusters().keySet()),
125+
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
126+
executionInfo,
127+
listener
128+
);
129+
}
130+
131+
protected void doResolvePolicies(
132+
Set<String> remoteClusters,
118133
Collection<UnresolvedPolicy> unresolvedPolicies,
119134
EsqlExecutionInfo executionInfo,
120135
ActionListener<EnrichResolution> listener
@@ -124,13 +139,10 @@ public void resolvePolicies(
124139
return;
125140
}
126141

127-
final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
128142
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
129143
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
130144
final EnrichResolution enrichResolution = new EnrichResolution();
131-
132-
Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
133-
145+
final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
134146
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
135147
String clusterAlias = entry.getKey();
136148
if (entry.getValue().connectionError != null) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,12 +373,10 @@ public void analyzedPlan(
373373
}
374374

375375
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
376-
var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet());
377-
378376
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
379377

380378
var listener = SubscribableListener. //
381-
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l))
379+
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
382380
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
383381
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
384382
// first resolve the lookup indices, then the main indices

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.ArrayList;
5252
import java.util.Collection;
5353
import java.util.HashMap;
54+
import java.util.HashSet;
5455
import java.util.List;
5556
import java.util.Map;
5657
import java.util.Set;
@@ -434,7 +435,6 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
434435
}
435436

436437
EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
437-
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
438438
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
439439
for (String cluster : clusters) {
440440
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
@@ -452,7 +452,8 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
452452
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
453453
}
454454
}
455-
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
455+
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
456+
super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future);
456457
return future.actionGet(30, TimeUnit.SECONDS);
457458
}
458459

0 commit comments

Comments
 (0)