Skip to content

Commit 8c25707

Browse files
authored
ESQL: Disallow remote enrich after lu join (#131426) (#131535)
Fix #129372 Due to how remote ENRICH is [planned](https://github.com/elastic/elasticsearch/blob/32e50d0d94e27ee559d24bf9d5463ba6e64d1788/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java#L93), it interacts in special ways with pipeline breakers, in particular LIMIT and TopN; when these are encountered upstream from a remote ENRICH, these nodes are copied and executed a second time after the remote ENRICH. We'd like to allow remote ENRICH after LOOKUP JOIN, but that forces the lookup to be remote as well; this has its own interactions with pipeline breakers: in particular, LIMITs and TopNs cannot just be duplicated after LOOKUP JOIN, as LOOKUP JOIN may add new rows. For now, let's just forbid any usage of remote ENRICH after LOOKUP JOINs; remote ENRICH is mostly relevant for CCS, and LOOKUP JOIN doesn't support that in 9.1/8.19, anyway. There is separate work that enables remote LOOKUP JOINs on remote clusters and adds the correct validations; we can later build support for remote ENRICH + LOOKUP JOIN on top of that. (C.f. my comment [here](#129372 (comment)) and my draft #131286 for enabling this.) (cherry picked from commit 06e39c0) # Conflicts: # x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java
1 parent 2b4b8d5 commit 8c25707

File tree

7 files changed

+423
-40
lines changed

7 files changed

+423
-40
lines changed

docs/changelog/131426.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131426
2+
summary: Disallow remote enrich after lu join
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129372

x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,3 +662,104 @@ from *
662662
author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
663663
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
664664
;
665+
666+
667+
statsAfterRemoteEnrich
668+
required_capability: enrich_load
669+
670+
FROM sample_data
671+
| KEEP message
672+
| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2")
673+
| EVAL language_code = "1"
674+
| ENRICH _remote:languages_policy ON language_code
675+
| STATS messages = count_distinct(message) BY language_name
676+
;
677+
678+
messages:long | language_name:keyword
679+
2 | English
680+
;
681+
682+
683+
enrichAfterRemoteEnrich
684+
required_capability: enrich_load
685+
686+
FROM sample_data
687+
| KEEP message
688+
| WHERE message IN ("Connected to 10.1.0.1")
689+
| EVAL language_code = "1"
690+
| ENRICH _remote:languages_policy ON language_code
691+
| RENAME language_name AS first_language_name
692+
| ENRICH languages_policy ON language_code
693+
;
694+
695+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
696+
Connected to 10.1.0.1 | 1 | English | English
697+
;
698+
699+
700+
coordinatorEnrichAfterRemoteEnrich
701+
required_capability: enrich_load
702+
703+
FROM sample_data
704+
| KEEP message
705+
| WHERE message IN ("Connected to 10.1.0.1")
706+
| EVAL language_code = "1"
707+
| ENRICH _remote:languages_policy ON language_code
708+
| RENAME language_name AS first_language_name
709+
| ENRICH _coordinator:languages_policy ON language_code
710+
;
711+
712+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
713+
Connected to 10.1.0.1 | 1 | English | English
714+
;
715+
716+
717+
doubleRemoteEnrich
718+
required_capability: enrich_load
719+
720+
FROM sample_data
721+
| KEEP message
722+
| WHERE message IN ("Connected to 10.1.0.1")
723+
| EVAL language_code = "1"
724+
| ENRICH _remote:languages_policy ON language_code
725+
| RENAME language_name AS first_language_name
726+
| ENRICH _remote:languages_policy ON language_code
727+
;
728+
729+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
730+
Connected to 10.1.0.1 | 1 | English | English
731+
;
732+
733+
734+
enrichAfterCoordinatorEnrich
735+
required_capability: enrich_load
736+
737+
FROM sample_data
738+
| KEEP message
739+
| WHERE message IN ("Connected to 10.1.0.1")
740+
| EVAL language_code = "1"
741+
| ENRICH _coordinator:languages_policy ON language_code
742+
| RENAME language_name AS first_language_name
743+
| ENRICH languages_policy ON language_code
744+
;
745+
746+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
747+
Connected to 10.1.0.1 | 1 | English | English
748+
;
749+
750+
751+
doubleCoordinatorEnrich
752+
required_capability: enrich_load
753+
754+
FROM sample_data
755+
| KEEP message
756+
| WHERE message IN ("Connected to 10.1.0.1")
757+
| EVAL language_code = "1"
758+
| ENRICH _coordinator:languages_policy ON language_code
759+
| RENAME language_name AS first_language_name
760+
| ENRICH _coordinator:languages_policy ON language_code
761+
;
762+
763+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
764+
Connected to 10.1.0.1 | 1 | English | English
765+
;

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4667,3 +4667,101 @@ FROM sample_data_ts_nanos
46674667
2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
46684668
2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
46694669
;
4670+
4671+
###############################################
4672+
# LOOKUP JOIN and ENRICH
4673+
###############################################
4674+
4675+
enrichAfterLookupJoin
4676+
required_capability: join_lookup_v12
4677+
4678+
FROM sample_data
4679+
| KEEP message
4680+
| WHERE message == "Connected to 10.1.0.1"
4681+
| EVAL language_code = "1"
4682+
| LOOKUP JOIN message_types_lookup ON message
4683+
| ENRICH languages_policy ON language_code
4684+
;
4685+
4686+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4687+
Connected to 10.1.0.1 | 1 | Success | English
4688+
;
4689+
4690+
4691+
lookupJoinAfterEnrich
4692+
required_capability: join_lookup_v12
4693+
4694+
FROM sample_data
4695+
| KEEP message
4696+
| WHERE message == "Connected to 10.1.0.1"
4697+
| EVAL language_code = "1"
4698+
| ENRICH languages_policy ON language_code
4699+
| LOOKUP JOIN message_types_lookup ON message
4700+
;
4701+
4702+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
4703+
Connected to 10.1.0.1 | 1 | English | Success
4704+
;
4705+
4706+
4707+
lookupJoinAfterRemoteEnrich
4708+
required_capability: join_lookup_v12
4709+
4710+
FROM sample_data
4711+
| KEEP message
4712+
| WHERE message == "Connected to 10.1.0.1"
4713+
| EVAL language_code = "1"
4714+
| ENRICH _remote:languages_policy ON language_code
4715+
| LOOKUP JOIN message_types_lookup ON message
4716+
;
4717+
4718+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
4719+
Connected to 10.1.0.1 | 1 | English | Success
4720+
;
4721+
4722+
4723+
lookupJoinAfterLimitAndRemoteEnrich
4724+
required_capability: join_lookup_v12
4725+
4726+
FROM sample_data
4727+
| KEEP message
4728+
| WHERE message == "Connected to 10.1.0.1"
4729+
| EVAL language_code = "1"
4730+
| LIMIT 1
4731+
| ENRICH _remote:languages_policy ON language_code
4732+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
4733+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
4734+
| KEEP message, enrich_language_name, language_name, country.keyword
4735+
| SORT language_name, country.keyword
4736+
;
4737+
4738+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
4739+
Connected to 10.1.0.1 | English | English | Canada
4740+
Connected to 10.1.0.1 | English | English | United States of America
4741+
Connected to 10.1.0.1 | English | English | null
4742+
Connected to 10.1.0.1 | English | null | United Kingdom
4743+
;
4744+
4745+
4746+
lookupJoinAfterTopNAndRemoteEnrich
4747+
required_capability: join_lookup_v12
4748+
4749+
FROM sample_data
4750+
| KEEP message
4751+
| WHERE message == "Connected to 10.1.0.1"
4752+
| EVAL language_code = "1"
4753+
| SORT message
4754+
| LIMIT 1
4755+
| ENRICH _remote:languages_policy ON language_code
4756+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
4757+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
4758+
| KEEP message, enrich_language_name, language_name, country.keyword
4759+
| SORT language_name, country.keyword
4760+
;
4761+
4762+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
4763+
Connected to 10.1.0.1 | English | English | Canada
4764+
Connected to 10.1.0.1 | English | English | United States of America
4765+
Connected to 10.1.0.1 | English | English | null
4766+
Connected to 10.1.0.1 | English | null | United Kingdom
4767+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.esql.index.EsIndex;
3636
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3737
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
38+
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
3839

3940
import java.io.IOException;
4041
import java.util.ArrayList;
@@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
295296
* retaining the originating cluster and restructing pages for routing, which might be complicated.
296297
*/
297298
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
298-
boolean[] agg = { false };
299-
boolean[] enrichCoord = { false };
299+
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
300+
// in separate FORK branches which are valid by themselves.
301+
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
302+
}
303+
304+
/**
305+
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
306+
*/
307+
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
308+
if (enrich.mode != Mode.REMOTE) {
309+
return;
310+
}
311+
312+
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
313+
// https://github.com/elastic/elasticsearch/issues/131445
314+
boolean[] aggregate = { false };
315+
boolean[] coordinatorOnlyEnrich = { false };
316+
boolean[] lookupJoin = { false };
300317

301-
plan.forEachUp(UnaryPlan.class, u -> {
318+
enrich.forEachUp(LogicalPlan.class, u -> {
302319
if (u instanceof Aggregate) {
303-
agg[0] = true;
304-
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
305-
enrichCoord[0] = true;
306-
}
307-
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
308-
if (agg[0]) {
309-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
310-
}
311-
if (enrichCoord[0]) {
312-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
313-
}
320+
aggregate[0] = true;
321+
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
322+
coordinatorOnlyEnrich[0] = true;
323+
} else if (u instanceof LookupJoin) {
324+
lookupJoin[0] = true;
314325
}
315326
});
327+
328+
if (aggregate[0]) {
329+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
330+
}
331+
if (coordinatorOnlyEnrich[0]) {
332+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
333+
}
334+
if (lookupJoin[0]) {
335+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
336+
}
316337
}
317338
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
8181
PhysicalPlan mappedChild = map(unary.child());
8282

8383
//
84-
// TODO - this is hard to follow and needs reworking
84+
// TODO - this is hard to follow, causes bugs and needs reworking
8585
// https://github.com/elastic/elasticsearch/issues/115897
8686
//
8787
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
3737
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
3838
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
39+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
3940
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
4041
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
41-
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;
4242

4343
public final class AnalyzerTestUtils {
4444

@@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
6161
}
6262

6363
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
64-
return new Analyzer(
65-
new AnalyzerContext(
66-
EsqlTestUtils.TEST_CFG,
67-
new EsqlFunctionRegistry(),
68-
indexResolution,
69-
defaultLookupResolution(),
70-
defaultEnrichResolution(),
71-
emptyInferenceResolution()
72-
),
73-
verifier
74-
);
64+
return analyzer(indexResolution, defaultLookupResolution(), verifier);
7565
}
7666

7767
public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
68+
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
69+
}
70+
71+
public static Analyzer analyzer(
72+
IndexResolution indexResolution,
73+
Map<String, IndexResolution> lookupResolution,
74+
EnrichResolution enrichResolution,
75+
Verifier verifier
76+
) {
77+
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
78+
}
79+
80+
public static Analyzer analyzer(
81+
IndexResolution indexResolution,
82+
Map<String, IndexResolution> lookupResolution,
83+
EnrichResolution enrichResolution,
84+
Verifier verifier,
85+
Configuration config
86+
) {
7887
return new Analyzer(
7988
new AnalyzerContext(
80-
EsqlTestUtils.TEST_CFG,
89+
config,
8190
new EsqlFunctionRegistry(),
8291
indexResolution,
8392
lookupResolution,
84-
defaultEnrichResolution(),
93+
enrichResolution,
8594
defaultInferenceResolution()
8695
),
8796
verifier
8897
);
8998
}
9099

91100
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
92-
return new Analyzer(
93-
new AnalyzerContext(
94-
config,
95-
new EsqlFunctionRegistry(),
96-
indexResolution,
97-
defaultLookupResolution(),
98-
defaultEnrichResolution(),
99-
defaultInferenceResolution()
100-
),
101-
verifier
102-
);
101+
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
103102
}
104103

105104
public static Analyzer analyzer(Verifier verifier) {
@@ -213,6 +212,25 @@ public static void loadEnrichPolicyResolution(
213212
);
214213
}
215214

215+
public static void loadEnrichPolicyResolution(
216+
EnrichResolution enrich,
217+
Enrich.Mode mode,
218+
String policyType,
219+
String policy,
220+
String field,
221+
String index,
222+
String mapping
223+
) {
224+
IndexResolution indexResolution = loadMapping(mapping, index);
225+
List<String> enrichFields = new ArrayList<>(indexResolution.get().mapping().keySet());
226+
enrichFields.remove(field);
227+
enrich.addResolvedPolicy(
228+
policy,
229+
mode,
230+
new ResolvedEnrichPolicy(field, policyType, enrichFields, Map.of("", index), indexResolution.get().mapping())
231+
);
232+
}
233+
216234
public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) {
217235
loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping);
218236
}

0 commit comments

Comments
 (0)