Skip to content

Commit 0f65dd5

Browse files
committed
Make LJs remote before remote enrich
Alternative approach: rather than reinventing the wheel, let's just mark any LOOKUP JOINs upstream from a remote ENRICH as remote, too, so the validation automatically kicks in.
1 parent 4efe247 commit 0f65dd5

File tree

4 files changed

+110
-34
lines changed

4 files changed

+110
-34
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ public LogicalPlan visitShowInfo(EsqlBaseParser.ShowInfoContext ctx) {
463463

464464
@Override
465465
public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
466-
return p -> {
466+
return child -> {
467467
var source = source(ctx);
468468
Tuple<Mode, String> tuple = parsePolicyName(ctx.policyName);
469469
Mode mode = tuple.v1();
@@ -482,9 +482,15 @@ public PlanFactory visitEnrichCommand(EsqlBaseParser.EnrichCommandContext ctx) {
482482
}
483483

484484
List<NamedExpression> keepClauses = visitList(this, ctx.enrichWithClause(), NamedExpression.class);
485+
486+
// If this is a remote-only ENRICH, any upstream LOOKUP JOINs need to be treated as remote-only, too.
487+
LogicalPlan updatedChild = (mode == Mode.REMOTE) == false
488+
? child
489+
: child.transformDown(LookupJoin.class, lj -> new LookupJoin(lj.source(), lj.left(), lj.right(), lj.config(), true));
490+
485491
return new Enrich(
486492
source,
487-
p,
493+
updatedChild,
488494
mode,
489495
Literal.keyword(source(ctx.policyName), policyNameString),
490496
matchField,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2929
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
3030
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
31-
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
3231
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
3332
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
3433
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
@@ -88,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
8887
PhysicalPlan mappedChild = map(unary.child());
8988

9089
//
91-
// TODO - this is hard to follow and needs reworking
90+
// TODO - this is hard to follow, causes bugs and needs reworking
9291
// https://github.com/elastic/elasticsearch/issues/115897
9392
//
9493
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
@@ -106,10 +105,6 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
106105
// Remove most plan nodes between this remote ENRICH and the data node's fragment so they're not executed twice;
107106
// include the plan up until this ENRICH in the fragment.
108107
var childTransformed = mappedChild.transformUp(f -> {
109-
if (f instanceof BinaryExec be) {
110-
// Remove any LOOKUP JOIN or inline Join from INLINE STATS do avoid double execution
111-
return be.left();
112-
}
113108
// Once we reached FragmentExec, we stuff our Enrich under it
114109
if (f instanceof FragmentExec) {
115110
hasFragment.set(true);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
3737
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
3838
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
39+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
3940
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
4041
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
41-
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;
4242

4343
public final class AnalyzerTestUtils {
4444

@@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
6161
}
6262

6363
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
64-
return new Analyzer(
65-
new AnalyzerContext(
66-
EsqlTestUtils.TEST_CFG,
67-
new EsqlFunctionRegistry(),
68-
indexResolution,
69-
defaultLookupResolution(),
70-
defaultEnrichResolution(),
71-
emptyInferenceResolution()
72-
),
73-
verifier
74-
);
64+
return analyzer(indexResolution, defaultLookupResolution(), verifier);
7565
}
7666

7767
public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
68+
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
69+
}
70+
71+
public static Analyzer analyzer(
72+
IndexResolution indexResolution,
73+
Map<String, IndexResolution> lookupResolution,
74+
EnrichResolution enrichResolution,
75+
Verifier verifier
76+
) {
77+
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
78+
}
79+
80+
public static Analyzer analyzer(
81+
IndexResolution indexResolution,
82+
Map<String, IndexResolution> lookupResolution,
83+
EnrichResolution enrichResolution,
84+
Verifier verifier,
85+
Configuration config
86+
) {
7887
return new Analyzer(
7988
new AnalyzerContext(
80-
EsqlTestUtils.TEST_CFG,
89+
config,
8190
new EsqlFunctionRegistry(),
8291
indexResolution,
8392
lookupResolution,
84-
defaultEnrichResolution(),
93+
enrichResolution,
8594
defaultInferenceResolution()
8695
),
8796
verifier
8897
);
8998
}
9099

91100
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
92-
return new Analyzer(
93-
new AnalyzerContext(
94-
config,
95-
new EsqlFunctionRegistry(),
96-
indexResolution,
97-
defaultLookupResolution(),
98-
defaultEnrichResolution(),
99-
defaultInferenceResolution()
100-
),
101-
verifier
102-
);
101+
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
103102
}
104103

105104
public static Analyzer analyzer(Verifier verifier) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.esql.parser.ParsingException;
3030
import org.elasticsearch.xpack.esql.parser.QueryParam;
3131
import org.elasticsearch.xpack.esql.parser.QueryParams;
32+
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
3233

3334
import java.util.ArrayList;
3435
import java.util.LinkedHashMap;
@@ -38,9 +39,13 @@
3839
import java.util.Map;
3940
import java.util.Set;
4041

42+
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
4143
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
44+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
4245
import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant;
4346
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
47+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
48+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadEnrichPolicyResolution;
4449
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
4550
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
4651
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
@@ -2268,6 +2273,78 @@ public void testRemoteLookupJoinWithPipelineBreaker() {
22682273
);
22692274
}
22702275

2276+
public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() {
2277+
EnrichResolution enrichResolution = new EnrichResolution();
2278+
loadEnrichPolicyResolution(
2279+
enrichResolution,
2280+
Enrich.Mode.REMOTE,
2281+
MATCH_TYPE,
2282+
"languages",
2283+
"language_code",
2284+
"languages_idx",
2285+
"mapping-languages.json"
2286+
);
2287+
loadEnrichPolicyResolution(
2288+
enrichResolution,
2289+
Enrich.Mode.COORDINATOR,
2290+
MATCH_TYPE,
2291+
"languages_coord",
2292+
"language_code",
2293+
"languages_idx",
2294+
"mapping-languages.json"
2295+
);
2296+
var analyzer = AnalyzerTestUtils.analyzer(
2297+
loadMapping("mapping-default.json", "test"),
2298+
defaultLookupResolution(),
2299+
enrichResolution,
2300+
TEST_VERIFIER
2301+
);
2302+
2303+
String err = error("""
2304+
FROM test
2305+
| STATS c = COUNT(*) by languages
2306+
| EVAL language_code = languages
2307+
| LOOKUP JOIN languages_lookup ON language_code
2308+
| ENRICH _remote:languages ON language_code
2309+
""", analyzer);
2310+
assertThat(
2311+
err,
2312+
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@2:3")
2313+
);
2314+
assertThat(err, containsString("5:3: ENRICH with remote policy can't be executed after STATS"));
2315+
2316+
err = error("""
2317+
FROM test
2318+
| SORT emp_no
2319+
| EVAL language_code = languages
2320+
| LOOKUP JOIN languages_lookup ON language_code
2321+
| ENRICH _remote:languages ON language_code
2322+
""", analyzer);
2323+
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@2:3"));
2324+
2325+
err = error("""
2326+
FROM test
2327+
| LIMIT 2
2328+
| EVAL language_code = languages
2329+
| LOOKUP JOIN languages_lookup ON language_code
2330+
| ENRICH _remote:languages ON language_code
2331+
""", analyzer);
2332+
assertThat(err, containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@2:3"));
2333+
2334+
err = error("""
2335+
FROM test
2336+
| EVAL language_code = languages
2337+
| ENRICH _coordinator:languages_coord
2338+
| LOOKUP JOIN languages_lookup ON language_code
2339+
| ENRICH _remote:languages ON language_code
2340+
""", analyzer);
2341+
assertThat(
2342+
err,
2343+
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3")
2344+
);
2345+
assertThat(err, containsString("5:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
2346+
}
2347+
22712348
public void testRemoteLookupJoinIsSnapshot() {
22722349
// TODO: remove when we allow remote joins in release builds
22732350
assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled());
@@ -2282,7 +2359,6 @@ public void testRemoteLookupJoinIsDisabled() {
22822359
() -> query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code")
22832360
);
22842361
assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN"));
2285-
22862362
}
22872363

22882364
private void checkFullTextFunctionsInStats(String functionInvocation) {

0 commit comments

Comments
 (0)