@@ -110,11 +110,26 @@ public static UnresolvedPolicy from(Enrich e) {
110110 /**
111111 * Resolves a set of enrich policies
112112 *
113- * @param unresolvedPolicies the unresolved policies
113+ * @param enriches the unresolved policies
114114 * @param executionInfo the execution info
115115 * @param listener notified with the enrich resolution
116116 */
117- public void resolvePolicies (
117+ public void resolvePolicies (List <Enrich > enriches , EsqlExecutionInfo executionInfo , ActionListener <EnrichResolution > listener ) {
118+ if (enriches .isEmpty ()) {
119+ listener .onResponse (new EnrichResolution ());
120+ return ;
121+ }
122+
123+ doResolvePolicies (
124+ new HashSet <>(executionInfo .getClusters ().keySet ()),
125+ enriches .stream ().map (EnrichPolicyResolver .UnresolvedPolicy ::from ).toList (),
126+ executionInfo ,
127+ listener
128+ );
129+ }
130+
131+ protected void doResolvePolicies (
132+ Set <String > remoteClusters ,
118133 Collection <UnresolvedPolicy > unresolvedPolicies ,
119134 EsqlExecutionInfo executionInfo ,
120135 ActionListener <EnrichResolution > listener
@@ -124,13 +139,10 @@ public void resolvePolicies(
124139 return ;
125140 }
126141
127- final Set <String > remoteClusters = new HashSet <>(executionInfo .getClusters ().keySet ());
128142 final boolean includeLocal = remoteClusters .isEmpty () || remoteClusters .remove (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY );
129143 lookupPolicies (remoteClusters , includeLocal , unresolvedPolicies , listener .map (lookupResponses -> {
130144 final EnrichResolution enrichResolution = new EnrichResolution ();
131-
132- Map <String , LookupResponse > lookupResponsesToProcess = new HashMap <>();
133-
145+ final Map <String , LookupResponse > lookupResponsesToProcess = new HashMap <>();
134146 for (Map .Entry <String , LookupResponse > entry : lookupResponses .entrySet ()) {
135147 String clusterAlias = entry .getKey ();
136148 if (entry .getValue ().connectionError != null ) {
@@ -424,17 +436,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
424436 new ChannelActionListener <>(channel ),
425437 threadContext
426438 );
427- try (
428- RefCountingListener refs = new RefCountingListener (listener .map (unused -> new LookupResponse (resolvedPolices , failures )))
429- ) {
439+ try (var refs = new RefCountingListener (listener .map (unused -> new LookupResponse (resolvedPolices , failures )))) {
430440 for (String policyName : request .policyNames ) {
431441 EnrichPolicy p = availablePolicies .get (policyName );
432442 if (p == null ) {
433443 continue ;
434444 }
435445 try (ThreadContext .StoredContext ignored = threadContext .stashWithOrigin (ClientHelper .ENRICH_ORIGIN )) {
436446 String indexName = EnrichPolicy .getBaseName (policyName );
437- indexResolver .resolveAsMergedMapping (indexName , IndexResolver .ALL_FIELDS , null , refs .acquire (indexResult -> {
447+ indexResolver .resolveAsMergedMapping (indexName , IndexResolver .ALL_FIELDS , null , false , refs .acquire (indexResult -> {
438448 if (indexResult .isValid () && indexResult .get ().concreteIndices ().size () == 1 ) {
439449 EsIndex esIndex = indexResult .get ();
440450 var concreteIndices = Map .of (request .clusterAlias , Iterables .get (esIndex .concreteIndices (), 0 ));
@@ -449,17 +459,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
449459 } else {
450460 failures .put (policyName , indexResult .toString ());
451461 }
452- }), false );
462+ }));
453463 }
454464 }
455465 }
456466 }
457467 }
458468
459469 protected Map <String , EnrichPolicy > availablePolicies () {
460- final EnrichMetadata metadata = projectResolver .getProjectMetadata (clusterService .state ())
461- .custom (EnrichMetadata .TYPE , EnrichMetadata .EMPTY );
462- return metadata .getPolicies ();
470+ return projectResolver .getProjectMetadata (clusterService .state ()).custom (EnrichMetadata .TYPE , EnrichMetadata .EMPTY ).getPolicies ();
463471 }
464472
465473 protected void getRemoteConnection (String cluster , ActionListener <Transport .Connection > listener ) {
0 commit comments