Skip to content

Commit 5a4c3ab

Browse files
authored
ESQL: Allow remote enrich after LOOKUP JOIN (elastic#131940)
* Allow remote enrich after LOOKUP JOIN
1 parent c82f6bc commit 5a4c3ab

File tree

11 files changed

+239
-11
lines changed

11 files changed

+239
-11
lines changed

docs/changelog/131940.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131940
2+
summary: Allow remote enrich after LOOKUP JOIN
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1875,7 +1875,6 @@ type:keyword | language_code:integer | language_name:keyword
18751875
Production | 3 | Spanish
18761876
;
18771877

1878-
18791878
###############################################
18801879
# LOOKUP JOIN on mixed numerical fields
18811880
###############################################
@@ -4872,6 +4871,143 @@ Connected to 10.1.0.1 | English | English | n
48724871
Connected to 10.1.0.1 | English | null | United Kingdom
48734872
;
48744873

4874+
enrichAfterLookupJoin
4875+
required_capability: join_lookup_v12
4876+
4877+
FROM sample_data
4878+
| KEEP message
4879+
| WHERE message == "Connected to 10.1.0.1"
4880+
| EVAL language_code = "1"
4881+
| LOOKUP JOIN message_types_lookup ON message
4882+
| ENRICH languages_policy ON language_code
4883+
;
4884+
4885+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4886+
Connected to 10.1.0.1 | 1 | Success | English
4887+
;
4888+
4889+
###############################################
4890+
# LOOKUP JOIN and remote ENRICH
4891+
###############################################
4892+
4893+
remoteEnrichAfterLookupJoin
4894+
required_capability: join_lookup_v12
4895+
required_capability: remote_enrich_after_lookup_join
4896+
4897+
FROM sample_data
4898+
| KEEP message
4899+
| WHERE message == "Connected to 10.1.0.1"
4900+
| EVAL language_code = "1"
4901+
| LOOKUP JOIN message_types_lookup ON message
4902+
| ENRICH _remote:languages_policy ON language_code
4903+
;
4904+
4905+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4906+
Connected to 10.1.0.1 | 1 | Success | English
4907+
;
4908+
4909+
remoteEnrichSortAfterLookupJoin
4910+
required_capability: join_lookup_v12
4911+
required_capability: remote_enrich_after_lookup_join
4912+
4913+
FROM sample_data
4914+
| KEEP message
4915+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4916+
| EVAL language_code = "1"
4917+
| LOOKUP JOIN message_types_lookup ON message
4918+
| ENRICH _remote:languages_policy ON language_code
4919+
| SORT message ASC
4920+
;
4921+
4922+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4923+
Connected to 10.1.0.1 | 1 | Success | English
4924+
Connected to 10.1.0.2 | 1 | Success | English
4925+
;
4926+
4927+
sortRemoteEnrichAfterLookupJoin
4928+
required_capability: join_lookup_v12
4929+
required_capability: remote_enrich_after_lookup_join
4930+
4931+
FROM sample_data
4932+
| KEEP message
4933+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4934+
| EVAL language_code = "1"
4935+
| LOOKUP JOIN message_types_lookup ON message
4936+
| SORT message ASC
4937+
| ENRICH _remote:languages_policy ON language_code
4938+
| LIMIT 2
4939+
;
4940+
4941+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4942+
Connected to 10.1.0.1 | 1 | Success | English
4943+
Connected to 10.1.0.2 | 1 | Success | English
4944+
;
4945+
4946+
remoteEnrichSortAfterLookupJoinWithLimit
4947+
required_capability: join_lookup_v12
4948+
required_capability: remote_enrich_after_lookup_join
4949+
4950+
FROM sample_data
4951+
| KEEP message
4952+
| WHERE message == "Connection error"
4953+
| EVAL language_code = "1"
4954+
| LOOKUP JOIN message_types_lookup ON message
4955+
| LIMIT 2
4956+
| ENRICH _remote:languages_policy ON language_code
4957+
| SORT message ASC
4958+
;
4959+
4960+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4961+
Connection error | 1 | Error | English
4962+
Connection error | 1 | Error | English
4963+
;
4964+
4965+
remoteEnrichBetweenLookupJoins
4966+
required_capability: join_lookup_v12
4967+
required_capability: remote_enrich_after_lookup_join
4968+
4969+
FROM sample_data
4970+
| KEEP message, client_ip
4971+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4972+
| EVAL language_code = "1", client_ip=to_string(client_ip)
4973+
| LOOKUP JOIN message_types_lookup ON message
4974+
| ENRICH _remote:languages_policy ON language_code
4975+
| LOOKUP JOIN clientips_lookup ON client_ip
4976+
| DROP language_code
4977+
| SORT message ASC
4978+
;
4979+
4980+
message:keyword | client_ip:keyword | type:keyword | language_name:keyword | env:keyword
4981+
Connected to 10.1.0.1 | 172.21.3.15 | Success | English | Production
4982+
Connected to 10.1.0.2 | 172.21.2.113 | Success | English | QA
4983+
;
4984+
4985+
remoteEnrichesAndLookupJoins
4986+
required_capability: join_lookup_v12
4987+
required_capability: remote_enrich_after_lookup_join
4988+
4989+
FROM sample_data
4990+
| EVAL language_code = "1", client_ip=to_string(client_ip)
4991+
| ENRICH _remote:languages_policy ON language_code
4992+
| LOOKUP JOIN clientips_lookup ON client_ip
4993+
| EVAL env1 = env
4994+
| ENRICH _remote:clientip_policy ON client_ip
4995+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
4996+
| LOOKUP JOIN message_types_lookup ON message
4997+
| KEEP message, client_ip, env, env1, type, language_name
4998+
| SORT message ASC
4999+
| LIMIT 10
5000+
;
5001+
5002+
message:keyword | client_ip:keyword | env:keyword | env1: keyword | type:keyword | language_name:keyword
5003+
Connected to 10.1.0.1 | 172.21.3.15 | Production | Production | Success | English
5004+
Connected to 10.1.0.2 | 172.21.2.113 | QA | QA | Success | English
5005+
;
5006+
5007+
###############################################
5008+
# Multi-field LOOKUP JOIN
5009+
###############################################
5010+
48755011
lookupJoinOnTwoFields
48765012
required_capability: join_lookup_v12
48775013
required_capability: lookup_join_on_multiple_fields

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
117117
SimplePauseFieldPlugin.allowEmitting.countDown();
118118

119119
try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
120-
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
120+
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
121121
// because we stopped it before processing the data
122122
assertThat(
123123
getValuesList(resp),

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3131

3232
/**
33-
* This IT test is the dual of CrossClustersEnrichIT, which tests "happy path"
33+
* This IT test is the dual of CrossClusterEnrichIT, which tests "happy path"
3434
* and this one tests unavailable cluster scenarios using (most of) the same tests.
3535
*/
3636
public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,12 @@ public enum Cap {
12771277
*/
12781278
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),
12791279

1280+
/**
1281+
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
1282+
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
1283+
*/
1284+
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,
1285+
12801286
/**
12811287
* MATCH PHRASE function
12821288
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) {
465465

466466
@Override
467467
public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
468-
return p -> {
468+
return child -> {
469469
var source = source(ctx);
470470
Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
471471
Mode mode = tuple.v1();
@@ -484,9 +484,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
484484
}
485485

486486
List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);
487+
488+
// If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
489+
if (mode == Mode.REMOTE) {
490+
child = child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));
491+
}
492+
487493
return new Enrich(
488494
source,
489-
p,
495+
child,
490496
mode,
491497
Literal.keyword(source(ctx.policyName), policyNameString),
492498
matchField,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.xpack.esql.core.tree.Source;
2424
import org.elasticsearch.xpack.esql.core.type.DataType;
2525
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
26-
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2726
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2827

2928
import java.io.IOException;
@@ -34,7 +33,7 @@
3433
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
3534
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3635

37-
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
36+
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
3837

3938
public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";
4039

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/InferencePlan.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
1414
import org.elasticsearch.xpack.esql.core.tree.Source;
1515
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
16+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
1617
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1718
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
1819
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
@@ -24,7 +25,8 @@
2425
public abstract class InferencePlan<PlanType extends InferencePlan<PlanType>> extends UnaryPlan
2526
implements
2627
SortAgnostic,
27-
GeneratingPlan<InferencePlan<PlanType>> {
28+
GeneratingPlan<InferencePlan<PlanType>>,
29+
ExecutesOn.Coordinator {
2830

2931
public static final String INFERENCE_ID_OPTION_NAME = "inference_id";
3032
public static final List<String> VALID_INFERENCE_OPTION_NAMES = List.of(INFERENCE_ID_OPTION_NAME);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.xpack.esql.core.type.DataType;
2727
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2828
import org.elasticsearch.xpack.esql.plan.logical.Eval;
29-
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
3029
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3130
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
3231

@@ -38,7 +37,7 @@
3837
import static org.elasticsearch.xpack.esql.common.Failure.fail;
3938
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
4039

41-
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator {
40+
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware {
4241

4342
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
4443
public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
100100
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
101101
Holder<Boolean> hasFragment = new Holder<>(false);
102102

103+
// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
104+
// include the plan up until this ENRICH in the fragment.
103105
var childTransformed = mappedChild.transformUp(f -> {
104106
// Once we reached FragmentExec, we stuff our Enrich under it
105107
if (f instanceof FragmentExec) {
@@ -118,7 +120,10 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
118120
return unaryExec.child();
119121
}
120122
}
121-
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
123+
// Here we have the following possibilities:
124+
// 1. LeafExec - should resolve to FragmentExec or we can ignore it
125+
// 2. Join - must be remote, and thus will go inside FragmentExec
126+
// 3. Fork/MergeExec - not currently allowed with remote enrich
122127
return f;
123128
});
124129

0 commit comments

Comments
 (0)