|
17 | 17 | import org.elasticsearch.index.IndexMode; |
18 | 18 | import org.elasticsearch.transport.RemoteClusterAware; |
19 | 19 | import org.elasticsearch.xpack.core.enrich.EnrichPolicy; |
20 | | -import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; |
| 20 | +import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; |
| 21 | +import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware; |
21 | 22 | import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; |
22 | 23 | import org.elasticsearch.xpack.esql.common.Failures; |
23 | 24 | import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; |
|
51 | 52 | import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes; |
52 | 53 | import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; |
53 | 54 |
|
54 | | -public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic { |
| 55 | +public class Enrich extends UnaryPlan |
| 56 | + implements |
| 57 | + GeneratingPlan<Enrich>, |
| 58 | + PostOptimizationVerificationAware, |
| 59 | + PostAnalysisVerificationAware, |
| 60 | + TelemetryAware, |
| 61 | + SortAgnostic, |
| 62 | + ExecutesOn { |
55 | 63 | public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( |
56 | 64 | LogicalPlan.class, |
57 | 65 | "Enrich", |
@@ -324,6 +332,43 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail |
324 | 332 | } |
325 | 333 | }); |
326 | 334 |
|
327 | | - badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c))); |
| 335 | + fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))); |
| 336 | + } |
| 337 | + |
| 338 | + /** |
| 339 | + * Remote ENRICH (and any remote operation in fact) is not compatible with MV_EXPAND + LIMIT. Consider: |
| 340 | + * `FROM *:events | SORT @timestamp | LIMIT 2 | MV_EXPAND ip | ENRICH _remote:clientip_policy ON ip` |
| 341 | + * Semantically, this must take two top events and then expand them. However, this can not be executed remotely, |
| 342 | + * because this means that we have to take top 2 events on each node, then expand them, then apply Enrich, |
| 343 | + * then bring them to the coordinator - but then we can not select top 2 of them - because that would be pre-expand! |
| 344 | + * We do not know which expanded rows are coming from the true top rows and which are coming from "false" top rows |
| 345 | + * which should have been thrown out. This is only possible to execute if MV_EXPAND executes on the coordinator |
| 346 | + * - which contradicts remote Enrich. |
| 347 | + * This could be fixed by the optimizer by moving MV_EXPAND past ENRICH, at least in some cases, but currently we do not do that. |
| 348 | + */ |
| 349 | + private void checkMvExpandAfterLimit(Failures failures) { |
| 350 | + this.forEachDown(MvExpand.class, u -> { |
| 351 | + u.forEachDown(p -> { |
| 352 | + if (p instanceof Limit || p instanceof TopN) { |
| 353 | + failures.add(fail(this, "MV_EXPAND after LIMIT is incompatible with remote ENRICH")); |
| 354 | + } |
| 355 | + }); |
| 356 | + }); |
| 357 | + |
| 358 | + } |
| 359 | + |
| 360 | + @Override |
| 361 | + public void postAnalysisVerification(Failures failures) { |
| 362 | + if (this.mode == Mode.REMOTE) { |
| 363 | + checkMvExpandAfterLimit(failures); |
| 364 | + } |
| 365 | + |
| 366 | + } |
| 367 | + |
| 368 | + @Override |
| 369 | + public void postOptimizationVerification(Failures failures) { |
| 370 | + if (this.mode == Mode.REMOTE) { |
| 371 | + checkForPlansForbiddenBeforeRemoteEnrich(failures); |
| 372 | + } |
328 | 373 | } |
329 | 374 | } |
0 commit comments