Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;

public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker {
public class Aggregate extends UnaryPlan
implements
PostAnalysisVerificationAware,
TelemetryAware,
SortAgnostic,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought: Hmm, maybe PipelineBreaker should inherit from ExecutesOn.Coordinator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure yet... Need to think about it a bit more to have a good answer for this.

ExecutesOn.Coordinator {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"Aggregate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
* enforced by the Limit in the surrogate plan.
*/
@SupportsObservabilityTier(tier = COMPLETE)
public class ChangePoint extends UnaryPlan implements SurrogateLogicalPlan, PostAnalysisVerificationAware, LicenseAware {
public class ChangePoint extends UnaryPlan
implements
SurrogateLogicalPlan,
PostAnalysisVerificationAware,
LicenseAware,
ExecutesOn.Coordinator {

private final Attribute value;
private final Attribute key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
Expand All @@ -35,7 +35,6 @@
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -45,13 +44,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
public class Enrich extends UnaryPlan
implements
GeneratingPlan<Enrich>,
PostAnalysisVerificationAware,
TelemetryAware,
SortAgnostic,
ExecutesOn {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"Enrich",
Expand All @@ -68,6 +72,16 @@ public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAna

private final Mode mode;

@Override
public ExecuteLocation executesOn() {
if (mode == Mode.REMOTE) {
return ExecuteLocation.REMOTE;
} else if (mode == Mode.COORDINATOR) {
return ExecuteLocation.COORDINATOR;
}
return ExecuteLocation.ANY;
}

public enum Mode {
ANY,
COORDINATOR,
Expand Down Expand Up @@ -279,11 +293,6 @@ public int hashCode() {
return Objects.hash(super.hashCode(), mode, policyName, matchField, policy, concreteIndices, enrichFields);
}

@Override
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
return Enrich::checkRemoteEnrich;
}

/**
* Ensure that no remote enrich is allowed after a reduction or an enrich with coordinator mode.
* <p>
Expand All @@ -296,34 +305,26 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
* We might consider implementing the actual remote enrich on the coordinating cluster, however, this requires
* retaining the originating cluster and restructing pages for routing, which might be complicated.
*/
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
// in separate FORK branches which are valid by themselves.
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
}

/**
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
*/
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
if (enrich.mode != Mode.REMOTE) {
return;
}

Set<String> badCommands = new HashSet<>();
Set<Source> badCommands = new HashSet<>();

enrich.forEachUp(LogicalPlan.class, u -> {
if (u instanceof Aggregate) {
badCommands.add("STATS");
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
badCommands.add("another ENRICH with coordinator policy");
} else if (u instanceof LookupJoin) {
badCommands.add("LOOKUP JOIN");
} else if (u instanceof Fork) {
badCommands.add("FORK");
if (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR) {
badCommands.add(u.source());
}
});

badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
badCommands.forEach(
f -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source()))
);
}

@Override
public void postAnalysisVerification(Failures failures) {
checkForPlansForbiddenBeforeRemoteEnrich(this, failures);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.logical;

/**
* Mark nodes that execute only in a specific way, either on the coordinator or on a remote node.
*/
public interface ExecutesOn {
enum ExecuteLocation {
COORDINATOR,
REMOTE,
ANY; // Can be executed on either coordinator or remote nodes
}

ExecuteLocation executesOn();

/**
* Executes on the remote nodes only (note that may include coordinator, but not on the aggregation stage).
*/
interface Remote extends ExecutesOn {
@Override
default ExecuteLocation executesOn() {
return ExecuteLocation.REMOTE;
}
}

/**
* Executes on the coordinator only. Can not be run on remote nodes.
*/
interface Coordinator extends ExecutesOn {
@Override
default ExecuteLocation executesOn() {
return ExecuteLocation.COORDINATOR;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
*/
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker {
public class Fork extends LogicalPlan
implements
PostAnalysisPlanVerificationAware,
TelemetryAware,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking FORK as pipeline breaker would become obsolete with the new interface, right?

Maybe the whole PipelineBreaker interface gets obsolete; or maybe not, because the original pipeline breakers (TopN, Limit, Aggs) are special in that they get "duplicated" into the fragment if they are the first node on the coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly? I think PipelineBreaker may have its uses but it's both too broad and too inexact for what we need here, that's why I want to introduce a narrow-targeted interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think introducing the narrow interface is nice :)

But Fork wasn't marked as pipeline breaker until #131945, and there we discussed that marking it as pipeline breaker could lead to confusion in other places that use that interface in a more specific way.

I'm proposing we undo that change and just mark Fork as ExecutesOn.Coordinator - marking as pipeline breaker shouldn't be required anymore,right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, it doesn't really need to be marked as breaker anymore now.

ExecutesOn.Coordinator {

public static final String FORK_FIELD = "_fork";
public static final int MAX_BRANCHES = 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.io.IOException;
Expand All @@ -33,7 +34,7 @@
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {

public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

Expand All @@ -33,7 +34,7 @@
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Rerank extends InferencePlan<Rerank> implements TelemetryAware {
public class Rerank extends InferencePlan<Rerank> implements TelemetryAware, ExecutesOn.Coordinator {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;

Expand Down Expand Up @@ -56,7 +57,7 @@
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.commonType;

public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic {
public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic, ExecutesOn {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new);
public static final DataType[] UNSUPPORTED_TYPES = {
TEXT,
Expand Down Expand Up @@ -309,4 +310,9 @@ private static boolean comparableTypes(Attribute left, Attribute right) {
public boolean isRemote() {
return isRemote;
}

@Override
public ExecuteLocation executesOn() {
return isRemote ? ExecuteLocation.REMOTE : ExecuteLocation.COORDINATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType;

import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
Expand Down Expand Up @@ -104,13 +105,10 @@ public void postAnalysisVerification(Failures failures) {
}

private void checkRemoteJoin(Failures failures) {
List<Source> fails = new LinkedList<>();
Set<Source> fails = new HashSet<>();

this.forEachUp(UnaryPlan.class, u -> {
if (u instanceof PipelineBreaker) {
fails.add(u.source());
}
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
if (u instanceof PipelineBreaker || (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR)) {
fails.add(u.source());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2358,7 +2358,7 @@ public void testRemoteEnrichAfterLookupJoin() {
| %s
| ENRICH _remote:languages ON language_code
""", lookupCommand), analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3"));

err = error(Strings.format("""
FROM test
Expand All @@ -2367,7 +2367,7 @@ public void testRemoteEnrichAfterLookupJoin() {
| ENRICH _remote:languages ON language_code
| %s
""", lookupCommand, lookupCommand), analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3"));

err = error(Strings.format("""
FROM test
Expand All @@ -2377,7 +2377,7 @@ public void testRemoteEnrichAfterLookupJoin() {
| MV_EXPAND language_code
| ENRICH _remote:languages ON language_code
""", lookupCommand), analyzer);
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after LOOKUP JOIN"));
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after [" + lookupCommand + "]@3:3"));
}

public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
Expand Down Expand Up @@ -2420,7 +2420,7 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
| STATS count(*) BY language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after STATS"));
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after [STATS count(*) BY language_code]@3:3"));

err = error("""
FROM test
Expand All @@ -2430,7 +2430,7 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
| MV_EXPAND language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after STATS"));
assertThat(err, containsString("6:3: ENRICH with remote policy can't be executed after [STATS count(*) BY language_code]@3:3"));

query("""
FROM test
Expand All @@ -2445,7 +2445,10 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
| ENRICH _coordinator:languages ON language_code
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
assertThat(
err,
containsString("4:3: ENRICH with remote policy can't be executed after [ENRICH _coordinator:languages ON language_code]@3:3")
);

err = error("""
FROM test
Expand All @@ -2456,15 +2459,23 @@ public void testRemoteEnrichAfterCoordinatorOnlyPlans() {
| DISSECT language_name "%{foo}"
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("7:3: ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
assertThat(
err,
containsString("7:3: ENRICH with remote policy can't be executed after [ENRICH _coordinator:languages ON language_code]@3:3")
);

err = error("""
FROM test
| FORK (WHERE languages == 1) (WHERE languages == 2)
| EVAL language_code = languages
| ENRICH _remote:languages ON language_code
""", analyzer);
assertThat(err, containsString("4:3: ENRICH with remote policy can't be executed after FORK"));
assertThat(
err,
containsString(
"4:3: ENRICH with remote policy can't be executed after [FORK (WHERE languages == 1) (WHERE languages == 2)]@2:3"
)
);
}

private void checkFullTextFunctionsInStats(String functionInvocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7004,7 +7004,7 @@ public void testAggThenEnrichRemote() {
| eval employee_id = to_str(emp_no)
| ENRICH _remote:departments
"""));
assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after STATS"));
assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after [STATS size=count(*) BY emp_no]@2:3"));
}

public void testEnrichBeforeLimit() {
Expand Down Expand Up @@ -7354,7 +7354,7 @@ public void testRejectRemoteEnrichAfterCoordinatorEnrich() {
"""));
assertThat(
error.getMessage(),
containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")
containsString("ENRICH with remote policy can't be executed after [ENRICH _coordinator:departments]@3:3")
);
}

Expand Down