1717import org .elasticsearch .index .IndexMode ;
1818import org .elasticsearch .transport .RemoteClusterAware ;
1919import org .elasticsearch .xpack .core .enrich .EnrichPolicy ;
20- import org .elasticsearch .xpack .esql .capabilities .PostAnalysisPlanVerificationAware ;
20+ import org .elasticsearch .xpack .esql .capabilities .PostAnalysisVerificationAware ;
2121import org .elasticsearch .xpack .esql .capabilities .TelemetryAware ;
2222import org .elasticsearch .xpack .esql .common .Failures ;
2323import org .elasticsearch .xpack .esql .core .capabilities .Resolvables ;
3535import org .elasticsearch .xpack .esql .index .EsIndex ;
3636import org .elasticsearch .xpack .esql .io .stream .PlanStreamInput ;
3737import org .elasticsearch .xpack .esql .plan .GeneratingPlan ;
38- import org .elasticsearch .xpack .esql .plan .logical .join .LookupJoin ;
3938
4039import java .io .IOException ;
4140import java .util .ArrayList ;
4544import java .util .Map ;
4645import java .util .Objects ;
4746import java .util .Set ;
48- import java .util .function .BiConsumer ;
4947
5048import static org .elasticsearch .xpack .esql .common .Failure .fail ;
5149import static org .elasticsearch .xpack .esql .core .expression .Expressions .asAttributes ;
5250import static org .elasticsearch .xpack .esql .expression .NamedExpressions .mergeOutputAttributes ;
5351
54- public class Enrich extends UnaryPlan implements GeneratingPlan <Enrich >, PostAnalysisPlanVerificationAware , TelemetryAware , SortAgnostic {
52+ public class Enrich extends UnaryPlan
53+ implements
54+ GeneratingPlan <Enrich >,
55+ PostAnalysisVerificationAware ,
56+ TelemetryAware ,
57+ SortAgnostic ,
58+ ExecutesOn {
5559 public static final NamedWriteableRegistry .Entry ENTRY = new NamedWriteableRegistry .Entry (
5660 LogicalPlan .class ,
5761 "Enrich" ,
@@ -68,6 +72,16 @@ public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAna
6872
6973 private final Mode mode ;
7074
75+ @ Override
76+ public ExecuteLocation executesOn () {
77+ if (mode == Mode .REMOTE ) {
78+ return ExecuteLocation .REMOTE ;
79+ } else if (mode == Mode .COORDINATOR ) {
80+ return ExecuteLocation .COORDINATOR ;
81+ }
82+ return ExecuteLocation .ANY ;
83+ }
84+
7185 public enum Mode {
7286 ANY ,
7387 COORDINATOR ,
@@ -279,11 +293,6 @@ public int hashCode() {
279293 return Objects .hash (super .hashCode (), mode , policyName , matchField , policy , concreteIndices , enrichFields );
280294 }
281295
282- @ Override
283- public BiConsumer <LogicalPlan , Failures > postAnalysisPlanVerification () {
284- return Enrich ::checkRemoteEnrich ;
285- }
286-
287296 /**
288297 * Ensure that no remote enrich is allowed after a reduction or an enrich with coordinator mode.
289298 * <p>
@@ -296,15 +305,6 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
296305 * We might consider implementing the actual remote enrich on the coordinating cluster, however, this requires
297306 * retaining the originating cluster and restructing pages for routing, which might be complicated.
298307 */
299- private static void checkRemoteEnrich (LogicalPlan plan , Failures failures ) {
300- // First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
301- // in separate FORK branches which are valid by themselves.
302- plan .forEachUp (Enrich .class , enrich -> checkForPlansForbiddenBeforeRemoteEnrich (enrich , failures ));
303- }
304-
305- /**
306- * For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
307- */
308308 private static void checkForPlansForbiddenBeforeRemoteEnrich (Enrich enrich , Failures failures ) {
309309 if (enrich .mode != Mode .REMOTE ) {
310310 return ;
@@ -313,25 +313,18 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
313313 Set <Source > badCommands = new HashSet <>();
314314
315315 enrich .forEachUp (LogicalPlan .class , u -> {
316- if (u instanceof ExecutesOn .Coordinator
317- || u instanceof Enrich upstreamEnrich && upstreamEnrich .mode () == Enrich .Mode .COORDINATOR
318- || u instanceof LookupJoin ) {
316+ if (u instanceof ExecutesOn ex && ex .executesOn () == ExecuteLocation .COORDINATOR ) {
319317 badCommands .add (u .source ());
320318 }
321-
322- // if (u instanceof Aggregate) {
323- // badCommands.add("STATS");
324- // } else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
325- // badCommands.add("another ENRICH with coordinator policy");
326- // } else if (u instanceof LookupJoin) {
327- // badCommands.add("LOOKUP JOIN");
328- // } else if (u instanceof Fork) {
329- // badCommands.add("FORK");
330- // }
331319 });
332320
333321 badCommands .forEach (
334322 f -> failures .add (fail (enrich , "ENRICH with remote policy can't be executed after [" + f .text () + "]" + f .source ()))
335323 );
336324 }
325+
326+ @ Override
327+ public void postAnalysisVerification (Failures failures ) {
328+ checkForPlansForbiddenBeforeRemoteEnrich (this , failures );
329+ }
337330}
0 commit comments