Skip to content

Commit 6477793

Browse files
alex-spieselasticsearchmachine
andauthored
[8.18] ESQL: Disallow remote enrich after lu join (#131426) (#131537)
* ESQL: Disallow remote enrich after lu join (#131426) Fix #129372 Due to how remote ENRICH is [planned](https://github.com/elastic/elasticsearch/blob/32e50d0d94e27ee559d24bf9d5463ba6e64d1788/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java#L93), it interacts in special ways with pipeline breakers, in particular LIMIT and TopN; when these are encountered upstream from a remote ENRICH, these nodes are copied and executed a second time after the remote ENRICH. We'd like to allow remote ENRICH after LOOKUP JOIN, but that forces the lookup to be remote as well; this has its own interactions with pipeline breakers: in particular, LIMITs and TopNs cannot just be duplicated after LOOKUP JOIN, as LOOKUP JOIN may add new rows. For now, let's just forbid any usage of remote ENRICH after LOOKUP JOINs; remote ENRICH is mostly relevant for CCS, and LOOKUP JOIN doesn't support that in 9.1/8.19, anyway. There is separate work that enables remote LOOKUP JOINs on remote clusters and adds the correct validations; we can later build support for remote ENRICH + LOOKUP JOIN on top of that. (C.f. my comment [here](#129372 (comment)) and my draft #131286 for enabling this.) (cherry picked from commit 06e39c0) # Conflicts: # x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java # x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java * [CI] Auto commit changes from spotless * Add test_lookup index to default test analyzer * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent dbc4624 commit 6477793

File tree

7 files changed

+428
-37
lines changed

7 files changed

+428
-37
lines changed

docs/changelog/131426.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131426
2+
summary: Disallow remote enrich after lu join
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129372

x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,3 +662,104 @@ from *
662662
author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
663663
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
664664
;
665+
666+
667+
statsAfterRemoteEnrich
668+
required_capability: enrich_load
669+
670+
FROM sample_data
671+
| KEEP message
672+
| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2")
673+
| EVAL language_code = "1"
674+
| ENRICH _remote:languages_policy ON language_code
675+
| STATS messages = count_distinct(message) BY language_name
676+
;
677+
678+
messages:long | language_name:keyword
679+
2 | English
680+
;
681+
682+
683+
enrichAfterRemoteEnrich
684+
required_capability: enrich_load
685+
686+
FROM sample_data
687+
| KEEP message
688+
| WHERE message IN ("Connected to 10.1.0.1")
689+
| EVAL language_code = "1"
690+
| ENRICH _remote:languages_policy ON language_code
691+
| RENAME language_name AS first_language_name
692+
| ENRICH languages_policy ON language_code
693+
;
694+
695+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
696+
Connected to 10.1.0.1 | 1 | English | English
697+
;
698+
699+
700+
coordinatorEnrichAfterRemoteEnrich
701+
required_capability: enrich_load
702+
703+
FROM sample_data
704+
| KEEP message
705+
| WHERE message IN ("Connected to 10.1.0.1")
706+
| EVAL language_code = "1"
707+
| ENRICH _remote:languages_policy ON language_code
708+
| RENAME language_name AS first_language_name
709+
| ENRICH _coordinator:languages_policy ON language_code
710+
;
711+
712+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
713+
Connected to 10.1.0.1 | 1 | English | English
714+
;
715+
716+
717+
doubleRemoteEnrich
718+
required_capability: enrich_load
719+
720+
FROM sample_data
721+
| KEEP message
722+
| WHERE message IN ("Connected to 10.1.0.1")
723+
| EVAL language_code = "1"
724+
| ENRICH _remote:languages_policy ON language_code
725+
| RENAME language_name AS first_language_name
726+
| ENRICH _remote:languages_policy ON language_code
727+
;
728+
729+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
730+
Connected to 10.1.0.1 | 1 | English | English
731+
;
732+
733+
734+
enrichAfterCoordinatorEnrich
735+
required_capability: enrich_load
736+
737+
FROM sample_data
738+
| KEEP message
739+
| WHERE message IN ("Connected to 10.1.0.1")
740+
| EVAL language_code = "1"
741+
| ENRICH _coordinator:languages_policy ON language_code
742+
| RENAME language_name AS first_language_name
743+
| ENRICH languages_policy ON language_code
744+
;
745+
746+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
747+
Connected to 10.1.0.1 | 1 | English | English
748+
;
749+
750+
751+
doubleCoordinatorEnrich
752+
required_capability: enrich_load
753+
754+
FROM sample_data
755+
| KEEP message
756+
| WHERE message IN ("Connected to 10.1.0.1")
757+
| EVAL language_code = "1"
758+
| ENRICH _coordinator:languages_policy ON language_code
759+
| RENAME language_name AS first_language_name
760+
| ENRICH _coordinator:languages_policy ON language_code
761+
;
762+
763+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
764+
Connected to 10.1.0.1 | 1 | English | English
765+
;

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1716,3 +1716,101 @@ ROW somefield = 0, type = "Production"
17161716
type:keyword | language_code:integer | language_name:keyword
17171717
Production | 3 | Spanish
17181718
;
1719+
1720+
###############################################
1721+
# LOOKUP JOIN and ENRICH
1722+
###############################################
1723+
1724+
enrichAfterLookupJoin
1725+
required_capability: join_lookup_v12
1726+
1727+
FROM sample_data
1728+
| KEEP message
1729+
| WHERE message == "Connected to 10.1.0.1"
1730+
| EVAL language_code = "1"
1731+
| LOOKUP JOIN message_types_lookup ON message
1732+
| ENRICH languages_policy ON language_code
1733+
;
1734+
1735+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
1736+
Connected to 10.1.0.1 | 1 | Success | English
1737+
;
1738+
1739+
1740+
lookupJoinAfterEnrich
1741+
required_capability: join_lookup_v12
1742+
1743+
FROM sample_data
1744+
| KEEP message
1745+
| WHERE message == "Connected to 10.1.0.1"
1746+
| EVAL language_code = "1"
1747+
| ENRICH languages_policy ON language_code
1748+
| LOOKUP JOIN message_types_lookup ON message
1749+
;
1750+
1751+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
1752+
Connected to 10.1.0.1 | 1 | English | Success
1753+
;
1754+
1755+
1756+
lookupJoinAfterRemoteEnrich
1757+
required_capability: join_lookup_v12
1758+
1759+
FROM sample_data
1760+
| KEEP message
1761+
| WHERE message == "Connected to 10.1.0.1"
1762+
| EVAL language_code = "1"
1763+
| ENRICH _remote:languages_policy ON language_code
1764+
| LOOKUP JOIN message_types_lookup ON message
1765+
;
1766+
1767+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
1768+
Connected to 10.1.0.1 | 1 | English | Success
1769+
;
1770+
1771+
1772+
lookupJoinAfterLimitAndRemoteEnrich
1773+
required_capability: join_lookup_v12
1774+
1775+
FROM sample_data
1776+
| KEEP message
1777+
| WHERE message == "Connected to 10.1.0.1"
1778+
| EVAL language_code = "1"
1779+
| LIMIT 1
1780+
| ENRICH _remote:languages_policy ON language_code
1781+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
1782+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
1783+
| KEEP message, enrich_language_name, language_name, country.keyword
1784+
| SORT language_name, country.keyword
1785+
;
1786+
1787+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
1788+
Connected to 10.1.0.1 | English | English | Canada
1789+
Connected to 10.1.0.1 | English | English | United States of America
1790+
Connected to 10.1.0.1 | English | English | null
1791+
Connected to 10.1.0.1 | English | null | United Kingdom
1792+
;
1793+
1794+
1795+
lookupJoinAfterTopNAndRemoteEnrich
1796+
required_capability: join_lookup_v12
1797+
1798+
FROM sample_data
1799+
| KEEP message
1800+
| WHERE message == "Connected to 10.1.0.1"
1801+
| EVAL language_code = "1"
1802+
| SORT message
1803+
| LIMIT 1
1804+
| ENRICH _remote:languages_policy ON language_code
1805+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
1806+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
1807+
| KEEP message, enrich_language_name, language_name, country.keyword
1808+
| SORT language_name, country.keyword
1809+
;
1810+
1811+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
1812+
Connected to 10.1.0.1 | English | English | Canada
1813+
Connected to 10.1.0.1 | English | English | United States of America
1814+
Connected to 10.1.0.1 | English | English | null
1815+
Connected to 10.1.0.1 | English | null | United Kingdom
1816+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.esql.index.EsIndex;
3636
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3737
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
38+
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
3839

3940
import java.io.IOException;
4041
import java.util.ArrayList;
@@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
295296
* retaining the originating cluster and restructing pages for routing, which might be complicated.
296297
*/
297298
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
298-
boolean[] agg = { false };
299-
boolean[] enrichCoord = { false };
299+
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
300+
// in separate FORK branches which are valid by themselves.
301+
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
302+
}
303+
304+
/**
305+
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
306+
*/
307+
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
308+
if (enrich.mode != Mode.REMOTE) {
309+
return;
310+
}
311+
312+
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
313+
// https://github.com/elastic/elasticsearch/issues/131445
314+
boolean[] aggregate = { false };
315+
boolean[] coordinatorOnlyEnrich = { false };
316+
boolean[] lookupJoin = { false };
300317

301-
plan.forEachUp(UnaryPlan.class, u -> {
318+
enrich.forEachUp(LogicalPlan.class, u -> {
302319
if (u instanceof Aggregate) {
303-
agg[0] = true;
304-
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
305-
enrichCoord[0] = true;
306-
}
307-
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
308-
if (agg[0]) {
309-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
310-
}
311-
if (enrichCoord[0]) {
312-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
313-
}
320+
aggregate[0] = true;
321+
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
322+
coordinatorOnlyEnrich[0] = true;
323+
} else if (u instanceof LookupJoin) {
324+
lookupJoin[0] = true;
314325
}
315326
});
327+
328+
if (aggregate[0]) {
329+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
330+
}
331+
if (coordinatorOnlyEnrich[0]) {
332+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
333+
}
334+
if (lookupJoin[0]) {
335+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
336+
}
316337
}
317338
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
7777
PhysicalPlan mappedChild = map(unary.child());
7878

7979
//
80-
// TODO - this is hard to follow and needs reworking
80+
// TODO - this is hard to follow, causes bugs and needs reworking
8181
// https://github.com/elastic/elasticsearch/issues/115897
8282
//
8383
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
2828
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
2929
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
30+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
3031
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
3132
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
3233

@@ -51,36 +52,37 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
5152
}
5253

5354
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
54-
return new Analyzer(
55-
new AnalyzerContext(
56-
EsqlTestUtils.TEST_CFG,
57-
new EsqlFunctionRegistry(),
58-
indexResolution,
59-
defaultLookupResolution(),
60-
defaultEnrichResolution()
61-
),
62-
verifier
63-
);
55+
return analyzer(indexResolution, defaultLookupResolution(), verifier);
6456
}
6557

6658
public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
59+
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
60+
}
61+
62+
public static Analyzer analyzer(
63+
IndexResolution indexResolution,
64+
Map<String, IndexResolution> lookupResolution,
65+
EnrichResolution enrichResolution,
66+
Verifier verifier
67+
) {
68+
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
69+
}
70+
71+
public static Analyzer analyzer(
72+
IndexResolution indexResolution,
73+
Map<String, IndexResolution> lookupResolution,
74+
EnrichResolution enrichResolution,
75+
Verifier verifier,
76+
Configuration config
77+
) {
6778
return new Analyzer(
68-
new AnalyzerContext(
69-
EsqlTestUtils.TEST_CFG,
70-
new EsqlFunctionRegistry(),
71-
indexResolution,
72-
lookupResolution,
73-
defaultEnrichResolution()
74-
),
79+
new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, lookupResolution, enrichResolution),
7580
verifier
7681
);
7782
}
7883

7984
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
80-
return new Analyzer(
81-
new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultLookupResolution(), defaultEnrichResolution()),
82-
verifier
83-
);
85+
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
8486
}
8587

8688
public static Analyzer analyzer(Verifier verifier) {
@@ -141,7 +143,12 @@ public static IndexResolution expandedDefaultIndexResolution() {
141143
}
142144

143145
public static Map<String, IndexResolution> defaultLookupResolution() {
144-
return Map.of("languages_lookup", loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP));
146+
return Map.of(
147+
"languages_lookup",
148+
loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP),
149+
"test_lookup",
150+
loadMapping("mapping-basic.json", "test_lookup", IndexMode.LOOKUP)
151+
);
145152
}
146153

147154
public static EnrichResolution defaultEnrichResolution() {
@@ -180,6 +187,25 @@ public static void loadEnrichPolicyResolution(
180187
);
181188
}
182189

190+
public static void loadEnrichPolicyResolution(
191+
EnrichResolution enrich,
192+
Enrich.Mode mode,
193+
String policyType,
194+
String policy,
195+
String field,
196+
String index,
197+
String mapping
198+
) {
199+
IndexResolution indexResolution = loadMapping(mapping, index);
200+
List<String> enrichFields = new ArrayList<>(indexResolution.get().mapping().keySet());
201+
enrichFields.remove(field);
202+
enrich.addResolvedPolicy(
203+
policy,
204+
mode,
205+
new ResolvedEnrichPolicy(field, policyType, enrichFields, Map.of("", index), indexResolution.get().mapping())
206+
);
207+
}
208+
183209
public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) {
184210
loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping);
185211
}

0 commit comments

Comments
 (0)