diff --git a/docs/changelog/131940.yaml b/docs/changelog/131940.yaml new file mode 100644 index 0000000000000..384530810d5f9 --- /dev/null +++ b/docs/changelog/131940.yaml @@ -0,0 +1,5 @@ +pr: 131940 +summary: Allow remote enrich after LOOKUP JOIN +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 8694505411291..3cbef5f9655c0 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1875,7 +1875,6 @@ type:keyword | language_code:integer | language_name:keyword Production | 3 | Spanish ; - ############################################### # LOOKUP JOIN on mixed numerical fields ############################################### @@ -4872,6 +4871,143 @@ Connected to 10.1.0.1 | English | English | n Connected to 10.1.0.1 | English | null | United Kingdom ; +enrichAfterLookupJoin +required_capability: join_lookup_v12 + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + +############################################### +# LOOKUP JOIN and remote ENRICH +############################################### + +remoteEnrichAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +; + +remoteEnrichSortAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +| SORT message ASC +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +Connected to 10.1.0.2 | 1 | Success | English +; + +sortRemoteEnrichAfterLookupJoin +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| SORT message ASC +| ENRICH _remote:languages_policy ON language_code +| LIMIT 2 +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 1 | Success | English +Connected to 10.1.0.2 | 1 | Success | English +; + +remoteEnrichSortAfterLookupJoinWithLimit +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message +| WHERE message == "Connection error" +| EVAL language_code = "1" +| LOOKUP JOIN message_types_lookup ON message +| LIMIT 2 +| ENRICH _remote:languages_policy ON language_code +| SORT message ASC +; + +message:keyword | language_code:keyword | type:keyword | language_name:keyword +Connection error | 1 | Error | English +Connection error | 1 | Error | English +; + +remoteEnrichBetweenLookupJoins +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| KEEP message, client_ip +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| EVAL language_code = "1", client_ip=to_string(client_ip) +| LOOKUP JOIN message_types_lookup ON message +| ENRICH _remote:languages_policy ON language_code +| LOOKUP JOIN clientips_lookup ON client_ip +| DROP language_code +| SORT message ASC +; + +message:keyword | client_ip:keyword | type:keyword | language_name:keyword | env:keyword +Connected to 10.1.0.1 | 172.21.3.15 | Success | English | Production +Connected to 10.1.0.2 | 172.21.2.113 | Success | English | QA +; + +remoteEnrichesAndLookupJoins +required_capability: join_lookup_v12 +required_capability: remote_enrich_after_lookup_join + +FROM sample_data +| EVAL language_code = "1", client_ip=to_string(client_ip) +| ENRICH _remote:languages_policy ON language_code +| LOOKUP JOIN clientips_lookup ON client_ip +| EVAL env1 = env +| ENRICH _remote:clientip_policy ON client_ip +| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2" +| LOOKUP JOIN message_types_lookup ON message +| KEEP message, client_ip, env, env1, type, language_name +| SORT message ASC +| LIMIT 10 +; + +message:keyword | client_ip:keyword | env:keyword | env1: keyword | type:keyword | language_name:keyword +Connected to 10.1.0.1 | 172.21.3.15 | Production | Production | Success | English +Connected to 10.1.0.2 | 172.21.2.113 | QA | QA | Success | English +; + +############################################### +# Multi-field LOOKUP JOIN +############################################### + lookupJoinOnTwoFields required_capability: join_lookup_v12 required_capability: lookup_join_on_multiple_fields diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 815b08409723c..cea8bbefc0a5c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception { SimplePauseFieldPlugin.allowEmitting.countDown(); try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) { - // Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent + // Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent // because we stopped it before processing the data assertThat( getValuesList(resp), diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java index 40ea21371e513..6e6ae2968fcb9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java @@ -30,7 +30,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; /** - * This IT test is the dual of CrossClustersEnrichIT, which tests "happy path" + * This IT test is the dual of CrossClusterEnrichIT, which tests "happy path" * and this one tests unavailable cluster scenarios using (most of) the same tests. */ public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 478fb5af2676e..19eac8bd9ad03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1277,6 +1277,12 @@ public enum Cap { */ ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()), + /** + * Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN}, + * see java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH + */ + REMOTE_ENRICH_AFTER_LOOKUP_JOIN, + /** * MATCH PHRASE function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 974f5fa485e47..55530e3c7ed73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -465,7 +465,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) { @Override public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) { - return p -> { + return child -> { var source = source(ctx); Tuple tuple = parsePolicyName(ctx.policyName); Mode mode = tuple.v1(); @@ -484,9 +484,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) { } List keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class); + + // If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too. + if (mode == Mode.REMOTE) { + child = child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true)); + } + return new Enrich( source, - p, + child, mode, Literal.keyword(source(ctx.policyName), policyNameString), matchField, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java index ffb7ccfbe4798..191664bea9a81 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java @@ -23,7 +23,6 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.io.IOException; @@ -34,7 +33,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Completion extends InferencePlan implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator { +public class Completion extends InferencePlan implements TelemetryAware, PostAnalysisVerificationAware { public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java index b4c7d21f3e364..633ed74d8addb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.plan.GeneratingPlan; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -24,7 +25,8 @@ public abstract class InferencePlan> extends UnaryPlan implements SortAgnostic, - GeneratingPlan> { + GeneratingPlan>, + ExecutesOn.Coordinator { public static final String INFERENCE_ID_OPTION_NAME = "inference_id"; public static final List VALID_INFERENCE_OPTION_NAMES = List.of(INFERENCE_ID_OPTION_NAME); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java index a63b70cd6cc57..6f86138397fa6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Eval; -import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -38,7 +37,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail; import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; -public class Rerank extends InferencePlan implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator { +public class Rerank extends InferencePlan implements PostAnalysisVerificationAware, TelemetryAware { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new); public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index aac1d58e5f7f1..22a5c3b403f3f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -100,6 +100,8 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). Holder hasFragment = new Holder<>(false); + // Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice; + // include the plan up until this ENRICH in the fragment. var childTransformed = mappedChild.transformUp(f -> { // Once we reached FragmentExec, we stuff our Enrich under it if (f instanceof FragmentExec) { @@ -118,7 +120,10 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { return unaryExec.child(); } } - // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it. + // Here we have the following possibilities: + // 1. LeafExec - should resolve to FragmentExec or we can ignore it + // 2. Join - must be remote, and thus will go inside FragmentExec + // 3. Fork/MergeExec - not currently allowed with remote enrich return f; }); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java index ed67ea33b9184..bd67155285117 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java @@ -358,4 +358,74 @@ public void testRemoteLookupJoinWithPipelineBreaker() { // Since FORK, RERANK, COMPLETION and CHANGE_POINT are not supported on remote indices, we can't check them here against the remote // LOOKUP JOIN } + + public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() { + EnrichResolution enrichResolution = new EnrichResolution(); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.REMOTE, + MATCH_TYPE, + "languages", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.COORDINATOR, + MATCH_TYPE, + "languages_coord", + "language_code", + "languages_idx", + "mapping-languages.json" + ); + var analyzer = AnalyzerTestUtils.analyzer( + loadMapping("mapping-default.json", "test"), + defaultLookupResolution(), + enrichResolution, + TEST_VERIFIER + ); + + String err = error(""" + FROM test + | STATS c = COUNT(*) by languages + | EVAL language_code = languages + | LOOKUP JOIN languages_lookup ON language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@2:3") + ); + + err = error(""" + FROM test + | SORT emp_no + | EVAL language_code = languages + | LOOKUP JOIN languages_lookup ON language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@2:3")); + + err = error(""" + FROM test + | LIMIT 2 + | EVAL language_code = languages + | LOOKUP JOIN languages_lookup ON language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@2:3")); + + err = error(""" + FROM test + | EVAL language_code = languages + | ENRICH _coordinator:languages_coord + | LOOKUP JOIN languages_lookup ON language_code + | ENRICH _remote:languages ON language_code + """, analyzer); + assertThat( + err, + containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3") + ); + } }