Skip to content

Commit 5c6b3f4

Browse files
committed
Allow remote enrich after LOOKUP JOIN
1 parent fa6e3e7 commit 5c6b3f4

File tree

4 files changed

+84
-1
lines changed

4 files changed

+84
-1
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,6 +1875,76 @@ type:keyword | language_code:integer | language_name:keyword
18751875
Production | 3 | Spanish
18761876
;
18771877

1878+
enrichAfterLookupJoin
1879+
required_capability: join_lookup_v12
1880+
1881+
FROM sample_data
1882+
| KEEP message
1883+
| WHERE message == "Connected to 10.1.0.1"
1884+
| EVAL language_code = "1"
1885+
| LOOKUP JOIN message_types_lookup ON message
1886+
| ENRICH languages_policy ON language_code
1887+
;
1888+
1889+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
1890+
Connected to 10.1.0.1 | 1 | Success | English
1891+
;
1892+
1893+
remoteEnrichAfterLookupJoin
1894+
required_capability: join_lookup_v12
1895+
required_capability: remote_enrich_after_lookup_join
1896+
1897+
# TODO: a bunch more tests, also switch orders, use double _remote enrich, double lookup join etc. Also add tests with
1898+
# _coordinator enrich. What about ROW?
1899+
1900+
FROM sample_data
1901+
| KEEP message
1902+
| WHERE message == "Connected to 10.1.0.1"
1903+
| EVAL language_code = "1"
1904+
| LOOKUP JOIN message_types_lookup ON message
1905+
| ENRICH _remote:languages_policy ON language_code
1906+
;
1907+
1908+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
1909+
Connected to 10.1.0.1 | 1 | Success | English
1910+
;
1911+
1912+
remoteEnrichSortAfterLookupJoin
1913+
required_capability: join_lookup_v12
1914+
required_capability: remote_enrich_after_lookup_join
1915+
1916+
FROM sample_data
1917+
| KEEP message
1918+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
1919+
| EVAL language_code = "1"
1920+
| LOOKUP JOIN message_types_lookup ON message
1921+
| ENRICH _remote:languages_policy ON language_code
1922+
| SORT message ASC
1923+
;
1924+
1925+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
1926+
Connected to 10.1.0.1 | 1 | Success | English
1927+
Connected to 10.1.0.2 | 1 | Success | English
1928+
;
1929+
1930+
sortRemoteEnrichAfterLookupJoin
1931+
required_capability: join_lookup_v12
1932+
required_capability: remote_enrich_after_lookup_join
1933+
1934+
FROM sample_data
1935+
| KEEP message
1936+
| WHERE message == "Connected to 10.1.0.1" OR message == "Connected to 10.1.0.2"
1937+
| EVAL language_code = "1"
1938+
| LOOKUP JOIN message_types_lookup ON message
1939+
| SORT message ASC
1940+
| ENRICH _remote:languages_policy ON language_code
1941+
| LIMIT 2
1942+
;
1943+
1944+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
1945+
Connected to 10.1.0.1 | 1 | Success | English
1946+
Connected to 10.1.0.2 | 1 | Success | English
1947+
;
18781948

18791949
###############################################
18801950
# LOOKUP JOIN on mixed numerical fields

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void testEnrichAfterStop() throws Exception {
117117
SimplePauseFieldPlugin.allowEmitting.countDown();
118118

119119
try (EsqlQueryResponse resp = stopAction.actionGet(30, TimeUnit.SECONDS)) {
120-
// Compare this to CrossClustersEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
120+
// Compare this to CrossClusterEnrichIT.testEnrichTwiceThenAggs - the results from c2 will be absent
121121
// because we stopped it before processing the data
122122
assertThat(
123123
getValuesList(resp),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,6 +1205,12 @@ public enum Cap {
12051205
*/
12061206
ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()),
12071207

1208+
/**
1209+
* Fix the planning of {@code | ENRICH _remote:policy} when there's a preceding {@code | LOOKUP JOIN},
1210+
* see <a href="https://github.com/elastic/elasticsearch/issues/129372">java.lang.ClassCastException when combining LOOKUP JOIN and remote ENRICH</a>
1211+
*/
1212+
REMOTE_ENRICH_AFTER_LOOKUP_JOIN,
1213+
12081214
/**
12091215
* MATCH PHRASE function
12101216
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2929
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
3030
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
31+
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
3132
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
3233
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
3334
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
@@ -102,7 +103,13 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
102103
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
103104
Holder<Boolean> hasFragment = new Holder<>(false);
104105

106+
// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
107+
// include the plan up until this ENRICH in the fragment.
105108
var childTransformed = mappedChild.transformUp(f -> {
109+
if (f instanceof BinaryExec be) {
110+
// Remove any LOOKUP JOIN or inline Join from INLINE STATS do avoid double execution
111+
return be.left();
112+
}
106113
// Once we reached FragmentExec, we stuff our Enrich under it
107114
if (f instanceof FragmentExec) {
108115
hasFragment.set(true);

0 commit comments

Comments
 (0)