Skip to content

Commit 2c051b5

Browse files
committed
Introduce execution location marker for better handling of remote/local compatibility
1 parent ccf9893 commit 2c051b5

File tree

9 files changed

+102
-24
lines changed

9 files changed

+102
-24
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@
4545
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
4646
import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;
4747

48-
public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker {
48+
public class Aggregate extends UnaryPlan
49+
implements
50+
PostAnalysisVerificationAware,
51+
TelemetryAware,
52+
SortAgnostic,
53+
PipelineBreaker,
54+
ExecutesOn.Coordinator {
4955
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5056
LogicalPlan.class,
5157
"Aggregate",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@
4444
* enforced by the Limit in the surrogate plan.
4545
*/
4646
@SupportsObservabilityTier(tier = COMPLETE)
47-
public class ChangePoint extends UnaryPlan implements SurrogateLogicalPlan, PostAnalysisVerificationAware, LicenseAware {
47+
public class ChangePoint extends UnaryPlan
48+
implements
49+
SurrogateLogicalPlan,
50+
PostAnalysisVerificationAware,
51+
LicenseAware,
52+
ExecutesOn.Coordinator {
4853

4954
private final Attribute value;
5055
private final Attribute key;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -310,20 +310,28 @@ private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Fail
310310
return;
311311
}
312312

313-
Set<String> badCommands = new HashSet<>();
313+
Set<Source> badCommands = new HashSet<>();
314314

315315
enrich.forEachUp(LogicalPlan.class, u -> {
316-
if (u instanceof Aggregate) {
317-
badCommands.add("STATS");
318-
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
319-
badCommands.add("another ENRICH with coordinator policy");
320-
} else if (u instanceof LookupJoin) {
321-
badCommands.add("LOOKUP JOIN");
322-
} else if (u instanceof Fork) {
323-
badCommands.add("FORK");
316+
if (u instanceof ExecutesOn.Coordinator
317+
|| u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR
318+
|| u instanceof LookupJoin) {
319+
badCommands.add(u.source());
324320
}
321+
322+
// if (u instanceof Aggregate) {
323+
// badCommands.add("STATS");
324+
// } else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
325+
// badCommands.add("another ENRICH with coordinator policy");
326+
// } else if (u instanceof LookupJoin) {
327+
// badCommands.add("LOOKUP JOIN");
328+
// } else if (u instanceof Fork) {
329+
// badCommands.add("FORK");
330+
// }
325331
});
326332

327-
badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
333+
badCommands.forEach(
334+
f -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))
335+
);
328336
}
329337
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.logical;
9+
10+
/**
11+
* Mark nodes that execute only in a specific way, either on the coordinator or on a remote node.
12+
*/
13+
public interface ExecutesOn {
14+
enum ExecuteLocation {
15+
COORDINATOR,
16+
REMOTE
17+
}
18+
19+
ExecuteLocation executesOn();
20+
21+
/**
22+
* Executes on the remote nodes only (note that may include coordinator, but not on the aggregation stage).
23+
*/
24+
interface Remote extends ExecutesOn {
25+
@Override
26+
default ExecuteLocation executesOn() {
27+
return ExecuteLocation.REMOTE;
28+
}
29+
}
30+
31+
/**
32+
* Executes on the coordinator only. Can not be run on remote nodes.
33+
*/
34+
interface Coordinator extends ExecutesOn {
35+
@Override
36+
default ExecuteLocation executesOn() {
37+
return ExecuteLocation.COORDINATOR;
38+
}
39+
}
40+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@
3333
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
3434
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
3535
*/
36-
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker {
36+
public class Fork extends LogicalPlan
37+
implements
38+
PostAnalysisPlanVerificationAware,
39+
TelemetryAware,
40+
PipelineBreaker,
41+
ExecutesOn.Coordinator {
3742

3843
public static final String FORK_FIELD = "_fork";
3944
public static final int MAX_BRANCHES = 8;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.esql.core.tree.Source;
2424
import org.elasticsearch.xpack.esql.core.type.DataType;
2525
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
26+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2627
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2728

2829
import java.io.IOException;
@@ -33,7 +34,7 @@
3334
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
3435
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3536

36-
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
37+
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
3738

3839
public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";
3940

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2424
import org.elasticsearch.xpack.esql.core.tree.Source;
2525
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
26+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2627
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2728
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2829

@@ -33,7 +34,7 @@
3334
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
3435
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3536

36-
public class Rerank extends InferencePlan<Rerank> implements TelemetryAware {
37+
public class Rerank extends InferencePlan<Rerank> implements TelemetryAware, ExecutesOn.Coordinator {
3738

3839
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
3940
public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1515
import org.elasticsearch.xpack.esql.core.tree.Source;
1616
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
17+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
1718
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1819
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
1920
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
@@ -107,7 +108,7 @@ private void checkRemoteJoin(Failures failures) {
107108
List<Source> fails = new LinkedList<>();
108109

109110
this.forEachUp(UnaryPlan.class, u -> {
110-
if (u instanceof PipelineBreaker) {
111+
if (u instanceof PipelineBreaker || u instanceof ExecutesOn.Coordinator) {
111112
fails.add(u.source());
112113
}
113114
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2358,7 +2358,7 @@ public void testRemoteEnrichAfterLookupJoin() {
23582358
| %s
23592359
| ENRICH _remote:languages ON language_code
23602360
""", lookupCommand), analyzer);
2361-
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
2361+
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3"));
23622362

23632363
err = error(Strings.format("""
23642364
FROM test
@@ -2367,7 +2367,7 @@ public void testRemoteEnrichAfterLookupJoin() {
23672367
| ENRICH _remote:languages ON language_code
23682368
| %s
23692369
""", lookupCommand, lookupCommand), analyzer);
2370-
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
2370+
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3"));
23712371

23722372
err = error(Strings.format("""
23732373
FROM test
@@ -2377,7 +2377,7 @@ public void testRemoteEnrichAfterLookupJoin() {
23772377
| MV_EXPAND language_code
23782378
| ENRICH _remote:languages ON language_code
23792379
""", lookupCommand), analyzer);
2380-
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
2380+
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3"));
23812381
}
23822382

23832383
public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
@@ -2420,7 +2420,7 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
24202420
| STATS count(*) BY language_code
24212421
| ENRICH _remote:languages ON language_code
24222422
""", analyzer);
2423-
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after STATS"));
2423+
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [STATS count(*) BY language_code]@3:3"));
24242424

24252425
err = error("""
24262426
FROM test
@@ -2430,7 +2430,7 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
24302430
| MV_EXPAND language_code
24312431
| ENRICH _remote:languages ON language_code
24322432
""", analyzer);
2433-
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after STATS"));
2433+
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after [STATS count(*) BY language_code]@3:3"));
24342434

24352435
query("""
24362436
FROM test
@@ -2445,7 +2445,10 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
24452445
| ENRICH _coordinator:languages ON language_code
24462446
| ENRICH _remote:languages ON language_code
24472447
""", analyzer);
2448-
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
2448+
assertThat(
2449+
err,
2450+
containsString("4:3: ENRICH with remote policy can't be executed after [ENRICH _coordinator:languages ON language_code]@3:3")
2451+
);
24492452

24502453
err = error("""
24512454
FROM test
@@ -2456,15 +2459,23 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
24562459
| DISSECT language_name "%{foo}"
24572460
| ENRICH _remote:languages ON language_code
24582461
""", analyzer);
2459-
assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
2462+
assertThat(
2463+
err,
2464+
containsString("7:3: ENRICH with remote policy can't be executed after [ENRICH _coordinator:languages ON language_code]@3:3")
2465+
);
24602466

24612467
err = error("""
24622468
FROM test
24632469
| FORK (WHERE languages == 1) (WHERE languages == 2)
24642470
| EVAL language_code = languages
24652471
| ENRICH _remote:languages ON language_code
24662472
""", analyzer);
2467-
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after FORK"));
2473+
assertThat(
2474+
err,
2475+
containsString(
2476+
"4:3: ENRICH with remote policy can't be executed after [FORK (WHERE languages == 1) (WHERE languages == 2)]@2:3"
2477+
)
2478+
);
24682479
}
24692480

24702481
private void checkFullTextFunctionsInStats(String functionInvocation) {

0 commit comments

Comments
 (0)