Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void testAggThenEnrichRemote() {
| sort vendor
""", enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after STATS"));
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [stats c = COUNT(*) by os]@4:3"));
}

public void testEnrichCoordinatorThenEnrichRemote() {
Expand All @@ -417,10 +417,7 @@ public void testEnrichCoordinatorThenEnrichRemote() {
| sort vendor
""", enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(
error.getMessage(),
containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy")
);
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [ENRICH _COORDINATOR"));
}

private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class ReplaceLimitAndSortAsTopN extends OptimizerRules.OptimizerRul
protected LogicalPlan rule(Limit plan) {
LogicalPlan p = plan;
if (plan.child() instanceof OrderBy o) {
p = new TopN(plan.source(), o.child(), o.order(), plan.limit());
p = new TopN(o.source(), o.child(), o.order(), plan.limit());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just a bug - no reason why TopN source has to come from limit, OrderBy is much better IMO.

}
return p;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;

public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker {
public class Aggregate extends UnaryPlan
implements
PostAnalysisVerificationAware,
TelemetryAware,
SortAgnostic,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought: Hmm, maybe PipelineBreaker should inherit from ExecutesOn.Coordinator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure yet... Need to think about it a bit more to have a good answer for this.

ExecutesOn.Coordinator {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"Aggregate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
* enforced by the Limit in the surrogate plan.
*/
@SupportsObservabilityTier(tier = COMPLETE)
public class ChangePoint extends UnaryPlan implements SurrogateLogicalPlan, PostAnalysisVerificationAware, LicenseAware {
public class ChangePoint extends UnaryPlan
implements
SurrogateLogicalPlan,
PostAnalysisVerificationAware,
LicenseAware,
ExecutesOn.Coordinator {

private final Attribute value;
private final Attribute key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
Expand All @@ -35,7 +35,6 @@
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -45,13 +44,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.asAttributes;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAnalysisPlanVerificationAware, TelemetryAware, SortAgnostic {
public class Enrich extends UnaryPlan
implements
GeneratingPlan<Enrich>,
PostOptimizationVerificationAware,
TelemetryAware,
SortAgnostic,
ExecutesOn {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"Enrich",
Expand All @@ -68,6 +72,16 @@ public class Enrich extends UnaryPlan implements GeneratingPlan<Enrich>, PostAna

private final Mode mode;

@Override
public ExecuteLocation executesOn() {
if (mode == Mode.REMOTE) {
return ExecuteLocation.REMOTE;
} else if (mode == Mode.COORDINATOR) {
return ExecuteLocation.COORDINATOR;
}
return ExecuteLocation.ANY;
}

public enum Mode {
ANY,
COORDINATOR,
Expand Down Expand Up @@ -279,11 +293,6 @@ public int hashCode() {
return Objects.hash(super.hashCode(), mode, policyName, matchField, policy, concreteIndices, enrichFields);
}

@Override
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
return Enrich::checkRemoteEnrich;
}

/**
* Ensure that no remote enrich is allowed after a reduction or an enrich with coordinator mode.
* <p>
Expand All @@ -294,36 +303,24 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
* In that case, users have to write it as `FROM test | ENRICH _remote: | ORDER @timestamp | LIMIT 10`,
* which is equivalent to bringing all data to the coordinating cluster.
* We might consider implementing the actual remote enrich on the coordinating cluster, however, this requires
* retaining the originating cluster and restructing pages for routing, which might be complicated.
*/
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
// in separate FORK branches which are valid by themselves.
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
}

/**
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
* retaining the originating cluster and restructuring pages for routing, which might be complicated.
*/
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
if (enrich.mode != Mode.REMOTE) {
return;
}
private void checkForPlansForbiddenBeforeRemoteEnrich(Failures failures) {
Set<Source> fails = new HashSet<>();

Set<String> badCommands = new HashSet<>();

enrich.forEachUp(LogicalPlan.class, u -> {
if (u instanceof Aggregate) {
badCommands.add("STATS");
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
badCommands.add("another ENRICH with coordinator policy");
} else if (u instanceof LookupJoin) {
badCommands.add("LOOKUP JOIN");
} else if (u instanceof Fork) {
badCommands.add("FORK");
this.forEachUp(LogicalPlan.class, u -> {
if (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR) {
fails.add(u.source());
}
});

badCommands.forEach(c -> failures.add(fail(enrich, "ENRICH with remote policy can't be executed after " + c)));
fails.forEach(f -> failures.add(fail(this, "ENRICH with remote policy can't be executed after [" + f.text() + "]" + f.source())));
}

@Override
public void postOptimizationVerification(Failures failures) {
if (this.mode == Mode.REMOTE) {
checkForPlansForbiddenBeforeRemoteEnrich(failures);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.logical;

/**
* Mark nodes that execute only in a specific way, either on the coordinator or on a remote node.
*/
public interface ExecutesOn {
enum ExecuteLocation {
COORDINATOR,
REMOTE,
ANY; // Can be executed on either coordinator or remote nodes
}

ExecuteLocation executesOn();

/**
* Executes on the remote nodes only (note that may include coordinator, but not on the aggregation stage).
*/
interface Remote extends ExecutesOn {
@Override
default ExecuteLocation executesOn() {
return ExecuteLocation.REMOTE;
}
}

/**
* Executes on the coordinator only. Can not be run on remote nodes.
*/
interface Coordinator extends ExecutesOn {
@Override
default ExecuteLocation executesOn() {
return ExecuteLocation.COORDINATOR;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
*/
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware, PipelineBreaker {
public class Fork extends LogicalPlan
implements
PostAnalysisPlanVerificationAware,
TelemetryAware,
PipelineBreaker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking FORK as pipeline breaker would become obsolete with the new interface, right?

Maybe the whole PipelineBreaker interface gets obsolete; or maybe not, because the original pipeline breakers (TopN, Limit, Aggs) are special in that they get "duplicated" into the fragment if they are the first node on the coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly? I think PipelineBreaker may have its uses but it's both too broad and too inexact for what we need here, that's why I want to introduce a narrow-targeted interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think introducing the narrow interface is nice :)

But Fork wasn't marked as pipeline breaker until #131945, and there we discussed that marking it as pipeline breaker could lead to confusion in other places that use that interface in a more specific way.

I'm proposing we undo that change and just mark Fork as ExecutesOn.Coordinator - marking as pipeline breaker shouldn't be required anymore,right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, it doesn't really need to be marked as breaker anymore now.

ExecutesOn.Coordinator {

public static final String FORK_FIELD = "_fork";
public static final int MAX_BRANCHES = 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.io.IOException;
Expand All @@ -33,7 +34,7 @@
import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware {
public class Completion extends InferencePlan<Completion> implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {

public static final String DEFAULT_OUTPUT_FIELD_NAME = "completion";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

Expand All @@ -37,7 +38,7 @@
import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware {
public class Rerank extends InferencePlan<Rerank> implements PostAnalysisVerificationAware, TelemetryAware, ExecutesOn.Coordinator {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Rerank", Rerank::new);
public static final String DEFAULT_INFERENCE_ID = ".rerank-v1-elasticsearch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
Expand All @@ -21,14 +22,19 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.logical.SortAgnostic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
Expand Down Expand Up @@ -56,7 +62,7 @@
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.commonType;

public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic {
public class Join extends BinaryPlan implements PostAnalysisVerificationAware, SortAgnostic, ExecutesOn, PostOptimizationVerificationAware {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Join", Join::new);
public static final DataType[] UNSUPPORTED_TYPES = {
TEXT,
Expand Down Expand Up @@ -309,4 +315,39 @@ private static boolean comparableTypes(Attribute left, Attribute right) {
public boolean isRemote() {
return isRemote;
}

@Override
public ExecuteLocation executesOn() {
return isRemote ? ExecuteLocation.REMOTE : ExecuteLocation.COORDINATOR;
}

private void checkRemoteJoin(Failures failures) {
Set<Source> fails = new HashSet<>();

var myself = this;
this.forEachUp(LogicalPlan.class, u -> {
if (u == myself) {
return; // skip myself
}
if (u instanceof Limit) {
// Limit is ok because it can be moved in by the optimizer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe comment that we check for upstream limits before optimization and thus can be sure that any limits encounter now were duplicated and have a corresponding downstream limit that ensures correctness.

return;
}
if (u instanceof PipelineBreaker || (u instanceof ExecutesOn ex && ex.executesOn() == ExecuteLocation.COORDINATOR)) {
fails.add(u.source());
}
});

fails.forEach(
f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.text() + "]" + f.source()))
);

}

@Override
public void postOptimizationVerification(Failures failures) {
if (isRemote()) {
checkRemoteJoin(failures);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType;

import java.util.LinkedList;
import java.util.List;

import static java.util.Collections.emptyList;
Expand All @@ -30,7 +27,7 @@
/**
* Lookup join - specialized LEFT (OUTER) JOIN between the main left side and a lookup index (index_mode = lookup) on the right.
*/
public class LookupJoin extends Join implements SurrogateLogicalPlan, PostAnalysisVerificationAware, TelemetryAware {
public class LookupJoin extends Join implements SurrogateLogicalPlan, TelemetryAware, PostAnalysisVerificationAware {

public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List<Attribute> joinFields, boolean isRemote) {
this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), isRemote);
Expand Down Expand Up @@ -104,21 +101,11 @@ public void postAnalysisVerification(Failures failures) {
}

private void checkRemoteJoin(Failures failures) {
List<Source> fails = new LinkedList<>();

this.forEachUp(UnaryPlan.class, u -> {
if (u instanceof PipelineBreaker) {
fails.add(u.source());
}
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
fails.add(u.source());
}
// Check only for LIMITs, Join will check the rest post-optimization
this.forEachUp(Limit.class, f -> {
failures.add(
fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.source().text() + "]" + f.source().source())
);
});

fails.forEach(
f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.text() + "]" + f.source()))
);

}

}
Loading