Skip to content

Commit 37b28e4

Browse files
authored
Introduce execution location marker for better handling of remote/local compatibility (#132205)
* Introduce execution location marker for better handling of remote/local compatibility
1 parent d557309 commit 37b28e4

File tree

14 files changed

+476
-253
lines changed

14 files changed

+476
-253
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichIT.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ public void testAggThenEnrichRemote() {
405405
| sort vendor
406406
""", enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE));
407407
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
408-
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after STATS"));
408+
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [stats c = COUNT(*) by os]@4:3"));
409409
}
410410

411411
public void testEnrichCoordinatorThenEnrichRemote() {
@@ -417,10 +417,7 @@ public void testEnrichCoordinatorThenEnrichRemote() {
417417
| sort vendor
418418
""", enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE));
419419
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
420-
assertThat(
421-
error.getMessage(),
422-
containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")
423-
);
420+
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [ENRICH _COORDINATOR"));
424421
}
425422

426423
private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceLimitAndSortAsTopN.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public final class ReplaceLimitAndSortAsTopN extends OptimizerRules.OptimizerRul
1818
protected LogicalPlan rule(Limit plan) {
1919
LogicalPlan p = plan;
2020
if (plan.child() instanceof OrderBy o) {
21-
p = new TopN(plan.source(), o.child(), o.order(), plan.limit());
21+
p = new TopN(o.source(), o.child(), o.order(), plan.limit());
2222
}
2323
return p;
2424
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@
4545
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
4646
import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;
4747

48-
public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker {
48+
public class Aggregate extends UnaryPlan
49+
implements
50+
PostAnalysisVerificationAware,
51+
TelemetryAware,
52+
SortAgnostic,
53+
PipelineBreaker,
54+
ExecutesOn.Coordinator {
4955
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5056
LogicalPlan.class,
5157
"Aggregate",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@
4444
* enforced by the Limit in the surrogate plan.
4545
*/
4646
@SupportsObservabilityTier(tier = COMPLETE)
47-
public class ChangePoint extends UnaryPlan implements SurrogateLogicalPlan, PostAnalysisVerificationAware, LicenseAware {
47+
public class ChangePoint extends UnaryPlan
48+
implements
49+
SurrogateLogicalPlan,
50+
PostAnalysisVerificationAware,
51+
LicenseAware,
52+
ExecutesOn.Coordinator {
4853

4954
private final Attribute value;
5055
private final Attribute key;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.index.IndexMode;
1818
import org.elasticsearch.transport.RemoteClusterAware;
1919
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
20-
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
20+
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
2121
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
2222
import org.elasticsearch.xpack.esql.common.Failures;
2323
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.xpack.esql.index.EsIndex;
3535
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3636
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
37-
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
3837

3938
import java.io.IOException;
4039
import java.util.ArrayList;
@@ -44,14 +43,19 @@
4443
import java.util.Map;
4544
import java.util.Objects;
4645
import java.util.Set;
47-
import java.util.function.BiConsumer;
4846

4947
import static org.elasticsearch.xpack.esql.common.Failure.fail;
5048
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
5149
import static org.elasticsearch.xpack.esql.expression.Foldables.literalValueOf;
5250
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
5351

54-
public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
52+
public class Enrich extends UnaryPlan
53+
implements
54+
GeneratingPlan<Enrich>,
55+
PostOptimizationVerificationAware,
56+
TelemetryAware,
57+
SortAgnostic,
58+
ExecutesOn {
5559
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5660
LogicalPlan.class,
5761
"Enrich",
@@ -68,6 +72,16 @@ public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAna
6872

6973
private final Mode mode;
7074

75+
@Override
76+
public ExecuteLocation executesOn() {
77+
if (mode == Mode.REMOTE) {
78+
return ExecuteLocation.REMOTE;
79+
} else if (mode == Mode.COORDINATOR) {
80+
return ExecuteLocation.COORDINATOR;
81+
}
82+
return ExecuteLocation.ANY;
83+
}
84+
7185
public enum Mode {
7286
ANY,
7387
COORDINATOR,
@@ -284,11 +298,6 @@ public int hashCode() {
284298
return Objects.hash(super.hashCode(), mode, policyName, matchField, policy, concreteIndices, enrichFields);
285299
}
286300

287-
@Override
288-
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
289-
return Enrich::checkRemoteEnrich;
290-
}
291-
292301
/**
293302
* Ensure that no remote enrich is allowed after a reduction or an enrich with coordinator mode.
294303
* <p>
@@ -299,36 +308,24 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
299308
* In that case, users have to write it as `FROM test | ENRICH _remote: | ORDER @timestamp | LIMIT 10`,
300309
* which is equivalent to bringing all data to the coordinating cluster.
301310
* We might consider implementing the actual remote enrich on the coordinating cluster, however, this requires
302-
* retaining the originating cluster and restructing pages for routing, which might be complicated.
303-
*/
304-
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
305-
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
306-
// in separate FORK branches which are valid by themselves.
307-
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
308-
}
309-
310-
/**
311-
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
311+
* retaining the originating cluster and restructuring pages for routing, which might be complicated.
312312
*/
313-
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
314-
if (enrich.mode != Mode.REMOTE) {
315-
return;
316-
}
313+
private void checkForPlansForbiddenBeforeRemoteEnrich(Failures failures) {
314+
Set<Source> fails = new HashSet<>();
317315

318-
Set<String> badCommands = new HashSet<>();
319-
320-
enrich.forEachUp(LogicalPlan.class, u -> {
321-
if (u instanceof Aggregate) {
322-
badCommands.add("STATS");
323-
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
324-
badCommands.add("another ENRICH with coordinator policy");
325-
} else if (u instanceof LookupJoin) {
326-
badCommands.add("LOOKUP JOIN");
327-
} else if (u instanceof Fork) {
328-
badCommands.add("FORK");
316+
this.forEachUp(LogicalPlan.class, u -> {
317+
if (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR) {
318+
fails.add(u.source());
329319
}
330320
});
331321

332-
badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
322+
fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source())));
323+
}
324+
325+
@Override
326+
public void postOptimizationVerification(Failures failures) {
327+
if (this.mode == Mode.REMOTE) {
328+
checkForPlansForbiddenBeforeRemoteEnrich(failures);
329+
}
333330
}
334331
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.logical;
9+
10+
/**
11+
* Mark nodes that execute only in a specific way, either on the coordinator or on a remote node.
12+
*/
13+
public interface ExecutesOn {
14+
enum ExecuteLocation {
15+
COORDINATOR,
16+
REMOTE,
17+
ANY; // Can be executed on either coordinator or remote nodes
18+
}
19+
20+
ExecuteLocation executesOn();
21+
22+
/**
23+
* Executes on the remote nodes only (note that may include coordinator, but not on the aggregation stage).
24+
*/
25+
interface Remote extends ExecutesOn {
26+
@Override
27+
default ExecuteLocation executesOn() {
28+
return ExecuteLocation.REMOTE;
29+
}
30+
}
31+
32+
/**
33+
* Executes on the coordinator only. Can not be run on remote nodes.
34+
*/
35+
interface Coordinator extends ExecutesOn {
36+
@Override
37+
default ExecuteLocation executesOn() {
38+
return ExecuteLocation.COORDINATOR;
39+
}
40+
}
41+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
3434
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
3535
*/
36-
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker {
36+
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, ExecutesOn.Coordinator {
3737

3838
public static final String FORK_FIELD = "_fork";
3939
public static final int MAX_BRANCHES = 8;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Completion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.esql.core.tree.Source;
2424
import org.elasticsearch.xpack.esql.core.type.DataType;
2525
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
26+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2627
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2728

2829
import java.io.IOException;
@@ -33,7 +34,7 @@
3334
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
3435
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3536

36-
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
37+
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
3738

3839
public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";
3940

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/inference/Rerank.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.xpack.esql.core.type.DataType;
2727
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2828
import org.elasticsearch.xpack.esql.plan.logical.Eval;
29+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2930
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3031
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
3132

@@ -37,7 +38,7 @@
3738
import static org.elasticsearch.xpack.esql.common.Failure.fail;
3839
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
3940

40-
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware {
41+
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator {
4142

4243
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
4344
public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
14+
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
1415
import org.elasticsearch.xpack.esql.common.Failures;
1516
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1617
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
@@ -21,14 +22,19 @@
2122
import org.elasticsearch.xpack.esql.core.type.DataType;
2223
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2324
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
25+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
26+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
2427
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
28+
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
2529
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;
2630

2731
import java.io.IOException;
2832
import java.util.ArrayList;
2933
import java.util.Arrays;
34+
import java.util.HashSet;
3035
import java.util.List;
3136
import java.util.Objects;
37+
import java.util.Set;
3238

3339
import static org.elasticsearch.xpack.esql.common.Failure.fail;
3440
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
@@ -56,7 +62,7 @@
5662
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
5763
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.commonType;
5864

59-
public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic {
65+
public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic, ExecutesOn, PostOptimizationVerificationAware {
6066
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new);
6167
public static final DataType[] UNSUPPORTED_TYPES = {
6268
TEXT,
@@ -309,4 +315,40 @@ private static boolean comparableTypes(Attribute left, Attribute right) {
309315
public boolean isRemote() {
310316
return isRemote;
311317
}
318+
319+
@Override
320+
public ExecuteLocation executesOn() {
321+
return isRemote ? ExecuteLocation.REMOTE : ExecuteLocation.COORDINATOR;
322+
}
323+
324+
private void checkRemoteJoin(Failures failures) {
325+
Set<Source> fails = new HashSet<>();
326+
327+
var myself = this;
328+
this.forEachUp(LogicalPlan.class, u -> {
329+
if (u == myself) {
330+
return; // skip myself
331+
}
332+
if (u instanceof Limit) {
333+
// Limit is ok because it can be moved in by the optimizer
334+
// We check LIMITs in LookupJoin pre-optimization so they are still not allowed there
335+
return;
336+
}
337+
if (u instanceof PipelineBreaker || (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR)) {
338+
fails.add(u.source());
339+
}
340+
});
341+
342+
fails.forEach(
343+
f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.text() + "]" + f.source()))
344+
);
345+
346+
}
347+
348+
@Override
349+
public void postOptimizationVerification(Failures failures) {
350+
if (isRemote()) {
351+
checkRemoteJoin(failures);
352+
}
353+
}
312354
}

0 commit comments

Comments
 (0)