diff --git a/docs/changelog/131426.yaml b/docs/changelog/131426.yaml new file mode 100644 index 0000000000000..4f79415ba069d --- /dev/null +++ b/docs/changelog/131426.yaml @@ -0,0 +1,6 @@ +pr: 131426 +summary: Disallow remote enrich after lu join +area: ES|QL +type: bug +issues: + - 129372 diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 705bfca2e903e..a943e917e0335 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -126,7 +126,10 @@ public MultiClusterSpecIT( "NullifiedJoinKeyToPurgeTheJoin", "SortBeforeAndAfterJoin", "SortEvalBeforeLookup", - "SortBeforeAndAfterMultipleJoinAndMvExpand" + "SortBeforeAndAfterMultipleJoinAndMvExpand", + "LookupJoinAfterTopNAndRemoteEnrich", + // Lookup join after LIMIT is not supported in CCS yet + "LookupJoinAfterLimitAndRemoteEnrich" ); @Override diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec index 9bfb08eb82b45..2aa6189a957ec 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec @@ -661,3 +661,104 @@ from * author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null ; + + +statsAfterRemoteEnrich +required_capability: enrich_load + +FROM sample_data +| KEEP message +| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2") +| EVAL language_code = "1" +| ENRICH _remote:languages_policy ON language_code +| STATS messages = count_distinct(message) BY language_name +; + +messages:long | language_name:keyword +2 | English +; + + +enrichAfterRemoteEnrich +required_capability: enrich_load + +FROM sample_data +| KEEP message +| WHERE message IN ("Connected to 10.1.0.1") +| EVAL language_code = "1" +| ENRICH _remote:languages_policy ON language_code +| RENAME language_name AS first_language_name +| ENRICH languages_policy ON language_code +; + +message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | English | English +; + + +coordinatorEnrichAfterRemoteEnrich +required_capability: enrich_load + +FROM sample_data +| KEEP message +| WHERE message IN ("Connected to 10.1.0.1") +| EVAL language_code = "1" +| ENRICH _remote:languages_policy ON language_code +| RENAME language_name AS first_language_name +| ENRICH _coordinator:languages_policy ON language_code +; + +message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | English | English +; + + +doubleRemoteEnrich +required_capability: enrich_load + +FROM sample_data +| KEEP message +| WHERE message IN ("Connected to 10.1.0.1") +| EVAL language_code = "1" +| ENRICH _remote:languages_policy ON language_code +| RENAME language_name AS first_language_name +| ENRICH _remote:languages_policy ON language_code +; + +message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | English | English +; + + +enrichAfterCoordinatorEnrich +required_capability: enrich_load + +FROM sample_data +| KEEP message +| WHERE message IN ("Connected to 10.1.0.1") +| EVAL language_code = "1" +| ENRICH _coordinator:languages_policy ON language_code +| RENAME language_name AS first_language_name +| ENRICH languages_policy ON language_code +; + +message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | English | English +; + + +doubleCoordinatorEnrich +required_capability: enrich_load + +FROM sample_data +| KEEP message +| WHERE message IN ("Connected to 10.1.0.1") +| EVAL language_code = "1" +| ENRICH _coordinator:languages_policy ON language_code +| RENAME language_name AS first_language_name +| ENRICH _coordinator:languages_policy ON language_code +; + +message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | English | English +; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index bdf0413a03d02..c71bf34cafd1a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -4773,3 +4773,101 @@ FROM sample_data_ts_nanos 2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 ; + +############################################### +# LOOKUP JOIN and ENRICH +############################################### + +enrichAfterLookupJoin +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + + +lookupJoinAfterEnrich +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| ENRICH languages_policy ON language_code +| LOOKUP JOIN message_types_lookup ON message +; + +message:keyword | language_code:keyword | language_name:keyword | type:keyword +Connected to 10.1.0.1 | 1 | English | Success +; + + +lookupJoinAfterRemoteEnrich +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| ENRICH _remote:languages_policy ON language_code +| LOOKUP JOIN message_types_lookup ON message +; + +message:keyword | language_code:keyword | language_name:keyword | type:keyword +Connected to 10.1.0.1 | 1 | English | Success +; + + +lookupJoinAfterLimitAndRemoteEnrich +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LIMIT 1 +| ENRICH _remote:languages_policy ON language_code +| EVAL enrich_language_name = language_name, language_code = language_code::integer +| LOOKUP JOIN languages_lookup_non_unique_key ON language_code +| KEEP message, enrich_language_name, language_name, country.keyword +| SORT language_name, country.keyword +; + +message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword +Connected to 10.1.0.1 | English | English | Canada +Connected to 10.1.0.1 | English | English | United States of America +Connected to 10.1.0.1 | English | English | null +Connected to 10.1.0.1 | English | null | United Kingdom +; + + +lookupJoinAfterTopNAndRemoteEnrich +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| SORT message +| LIMIT 1 +| ENRICH _remote:languages_policy ON language_code +| EVAL enrich_language_name = language_name, language_code = language_code::integer +| LOOKUP JOIN languages_lookup_non_unique_key ON language_code +| KEEP message, enrich_language_name, language_name, country.keyword +| SORT language_name, country.keyword +; + +message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword +Connected to 10.1.0.1 | English | English | Canada +Connected to 10.1.0.1 | English | English | United States of America +Connected to 10.1.0.1 | English | English | null +Connected to 10.1.0.1 | English | null | United Kingdom +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java index 11e9a57064e5b..7307fd8efad39 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.GeneratingPlan; +import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; import java.io.IOException; import java.util.ArrayList; @@ -295,23 +296,43 @@ public BiConsumer postAnalysisPlanVerification() { * retaining the originating cluster and restructing pages for routing, which might be complicated. */ private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) { - boolean[] agg = { false }; - boolean[] enrichCoord = { false }; + // First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be + // in separate FORK branches which are valid by themselves. + plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures)); + } + + /** + * For a given remote {@link Enrich}, check if there are any forbidden plans upstream. + */ + private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) { + if (enrich.mode != Mode.REMOTE) { + return; + } + + // TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only. + // https://github.com/elastic/elasticsearch/issues/131445 + boolean[] aggregate = { false }; + boolean[] coordinatorOnlyEnrich = { false }; + boolean[] lookupJoin = { false }; - plan.forEachUp(UnaryPlan.class, u -> { + enrich.forEachUp(LogicalPlan.class, u -> { if (u instanceof Aggregate) { - agg[0] = true; - } else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { - enrichCoord[0] = true; - } - if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { - if (agg[0]) { - failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS")); - } - if (enrichCoord[0]) { - failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); - } + aggregate[0] = true; + } else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) { + coordinatorOnlyEnrich[0] = true; + } else if (u instanceof LookupJoin) { + lookupJoin[0] = true; } }); + + if (aggregate[0]) { + failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS")); + } + if (coordinatorOnlyEnrich[0]) { + failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); + } + if (lookupJoin[0]) { + failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN")); + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 4d1d65d63932d..bf6f0b89efbec 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -87,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { PhysicalPlan mappedChild = map(unary.child()); // - // TODO - this is hard to follow and needs reworking + // TODO - this is hard to follow, causes bugs and needs reworking // https://github.com/elastic/elasticsearch/issues/115897 // if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index cbb825ca9581b..fbfa18dccc477 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -36,9 +36,9 @@ import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG; import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; public final class AnalyzerTestUtils { @@ -61,27 +61,36 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map lookupResolution, Verifier verifier) { + return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier); + } + + public static Analyzer analyzer( + IndexResolution indexResolution, + Map lookupResolution, + EnrichResolution enrichResolution, + Verifier verifier + ) { + return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG); + } + + public static Analyzer analyzer( + IndexResolution indexResolution, + Map lookupResolution, + EnrichResolution enrichResolution, + Verifier verifier, + Configuration config + ) { return new Analyzer( new AnalyzerContext( - EsqlTestUtils.TEST_CFG, + config, new EsqlFunctionRegistry(), indexResolution, lookupResolution, - defaultEnrichResolution(), + enrichResolution, defaultInferenceResolution() ), verifier @@ -89,17 +98,7 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code") ); assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN")); + } + public void testRemoteEnrichAfterLookupJoin() { + EnrichResolution enrichResolution = new EnrichResolution(); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + var analyzer = AnalyzerTestUtils.analyzer( + loadMapping("mapping-default.json", "test"), + defaultLookupResolution(), + enrichResolution, + TEST_VERIFIER + ); + + String lookupCommand = randomBoolean() ? "LOOKUP JOIN test_lookup ON languages" : "LOOKUP JOIN languages_lookup ON language_code"; + + query(Strings.format(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | %s + """, lookupCommand), analyzer); + + String err = error(Strings.format(""" + FROM test + | EVAL language_code = languages + | %s + | ENRICH _remote:languages ON language_code + """, lookupCommand), analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN")); + + err = error(Strings.format(""" + FROM test + | EVAL language_code = languages + | %s + | ENRICH _remote:languages ON language_code + | %s + """, lookupCommand, lookupCommand), analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN")); + + err = error(Strings.format(""" + FROM test + | EVAL language_code = languages + | %s + | EVAL x = 1 + | MV_EXPAND language_code + | ENRICH _remote:languages ON language_code + """, lookupCommand), analyzer); + assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after LOOKUP JOIN")); + } + + public void testRemoteEnrichAfterCoordinatorOnlyPlans() { + EnrichResolution enrichResolution = new EnrichResolution(); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.COORDINATOR, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + var analyzer = AnalyzerTestUtils.analyzer( + loadMapping("mapping-default.json", "test"), + defaultLookupResolution(), + enrichResolution, + TEST_VERIFIER + ); + + query(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | STATS count(*) BY language_name + """, analyzer); + + String err = error(""" + FROM test + | EVAL language_code = languages + | STATS count(*) BY language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after STATS")); + + err = error(""" + FROM test + | EVAL language_code = languages + | STATS count(*) BY language_code + | EVAL x = 1 + | MV_EXPAND language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after STATS")); + + query(""" + FROM test + | EVAL language_code = languages + | ENRICH _remote:languages ON language_code + | ENRICH _coordinator:languages ON language_code + """, analyzer); + + err = error(""" + FROM test + | EVAL language_code = languages + | ENRICH _coordinator:languages ON language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); + + err = error(""" + FROM test + | EVAL language_code = languages + | ENRICH _coordinator:languages ON language_code + | EVAL x = 1 + | MV_EXPAND language_name + | DISSECT language_name "%{foo}" + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")); } private void checkFullTextFunctionsInStats(String functionInvocation) {