Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void testIndicesDontExist() throws IOException {
// currently we don't support remote clusters in LOOKUP JOIN
// this check happens before resolving actual indices and results in a different error message
RemoteClusterAware.isRemoteIndexName(pattern)
? allOf(containsString("parsing_exception"), containsString("remote clusters are not supported in LOOKUP JOIN"))
? allOf(containsString("parsing_exception"), containsString("remote clusters are not supported"))
: allOf(containsString("verification_exception"), containsString("Unknown index [foo]"))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) {
if (RemoteClusterAware.isRemoteIndexName(rightPattern)) {
throw new ParsingException(
source(target),
"invalid index pattern [{}], remote clusters are not supported in LOOKUP JOIN",
"invalid index pattern [{}], remote clusters are not supported with LOOKUP JOIN",
rightPattern
);
}
Expand Down Expand Up @@ -620,22 +620,26 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) {
}

return p -> {
p.forEachUp(UnresolvedRelation.class, r -> {
for (var leftPattern : Strings.splitStringByCommaToArray(r.indexPattern().indexPattern())) {
if (RemoteClusterAware.isRemoteIndexName(leftPattern)) {
throw new ParsingException(
source(target),
"invalid index pattern [{}], remote clusters are not supported in LOOKUP JOIN",
r.indexPattern().indexPattern()
);
}
}
});

checkForRemoteClusters(p, source(target), "LOOKUP JOIN");
return new LookupJoin(source, p, right, joinFields);
};
}

private void checkForRemoteClusters(LogicalPlan plan, Source source, String commandName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit weird that we're running the same code 3 times from 3 different places just to get different error message... I wonder if there isn't a way to somehow capture it better so we don't need to walk the whole (sub-)tree each time anew.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just reusing the same approach we had for LOOKUP JOIN.
even on main right now, if an ES|QL query has 3 LOOKUP JOIN commands, we are doing the same check 3 times.
I thought that if we had a better way to do it this would have been flagged in #120277
also it's just a temporary limitation, since FORK/COMPLETION/FORK are all going to be tech preview features first and CCS support will be done as part of the transition to GA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we also have another place in LogicalPlanBuilder where walk the plan looking for UnresolvedRelation:

public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats);
return input -> {
if (input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null);
} else {
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
}
};
}

just wanted to check that it's a pattern we use in some other places as well, not just for CCS check 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we do such things repeatedly and I am not a big fan of this though I am not sure yet how to fix it. It will also become more complicated with CPS since we may not be able to know whether certain index pattern means remote or local without wider context, which is not available at that point. Not sure what to do with it, maybe these checks can be moved to a different place or checked in some other way. Needs more thinking. I guess it's ok for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it - there might be ways we could improve it, but as you said it's not clear how.
Hopefully we figure out CCS support before we need to go back and see how these check should be improved😄

plan.forEachUp(UnresolvedRelation.class, r -> {
for (var indexPattern : Strings.splitStringByCommaToArray(r.indexPattern().indexPattern())) {
if (RemoteClusterAware.isRemoteIndexName(indexPattern)) {
throw new ParsingException(
source,
"invalid index pattern [{}], remote clusters are not supported with {}",
r.indexPattern().indexPattern(),
commandName
);
}
}
});
}

@Override
@SuppressWarnings("unchecked")
public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
Expand All @@ -644,6 +648,7 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
throw new ParsingException(source(ctx), "Fork requires at least two branches");
}
return input -> {
checkForRemoteClusters(input, source(ctx), "FORK");
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
return new Fork(source(ctx), subPlans, List.of());
};
Expand Down Expand Up @@ -743,7 +748,10 @@ public PlanFactory visitRerankCommand(EsqlBaseParser.RerankCommandContext ctx) {
? inferenceId(ctx.inferenceId)
: new Literal(source, Rerank.DEFAULT_INFERENCE_ID, KEYWORD);

return p -> new Rerank(source, p, inferenceId, queryText, visitRerankFields(ctx.rerankFields()));
return p -> {
checkForRemoteClusters(p, source, "RERANK");
return new Rerank(source, p, inferenceId, queryText, visitRerankFields(ctx.rerankFields()));
};
}

@Override
Expand All @@ -755,7 +763,10 @@ public PlanFactory visitCompletionCommand(EsqlBaseParser.CompletionCommandContex
? new UnresolvedAttribute(source, Completion.DEFAULT_OUTPUT_FIELD_NAME)
: visitQualifiedName(ctx.targetField);

return p -> new Completion(source, p, inferenceId, prompt, targetField);
return p -> {
checkForRemoteClusters(p, source, "COMPLETION");
return new Completion(source, p, inferenceId, prompt, targetField);
};
}

public Literal inferenceId(EsqlBaseParser.IdentifierOrParameterContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
Expand All @@ -32,7 +33,7 @@
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
*/
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware {
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware {
Copy link
Contributor Author

@ioanatia ioanatia Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did sneak this one here, since I think it's the only change we require in order to include FORK in the telemetry payload and did not want to make it a separate PR.


public static final String FORK_FIELD = "_fork";
private final List<Attribute> output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3124,7 +3124,7 @@ public void testInvalidJoinPatterns() {
var joinPattern = randomIndexPattern(CROSS_CLUSTER, without(WILDCARD_PATTERN), without(INDEX_SELECTOR));
expectError(
"FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(),
"invalid index pattern [" + unquoteIndexPattern(joinPattern) + "], remote clusters are not supported in LOOKUP JOIN"
"invalid index pattern [" + unquoteIndexPattern(joinPattern) + "], remote clusters are not supported with LOOKUP JOIN"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I slightly changed the error message - but I wanted to reuse the same check for all commands cc @smalyshev
although I heard that for LOOKUP JOIN we might be able to remove these soon anyway 🤞

);
}
{
Expand All @@ -3133,7 +3133,7 @@ public void testInvalidJoinPatterns() {
var joinPattern = randomIndexPattern(without(CROSS_CLUSTER), without(WILDCARD_PATTERN), without(INDEX_SELECTOR));
expectError(
"FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(),
"invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported in LOOKUP JOIN"
"invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with LOOKUP JOIN"
);
}

Expand Down Expand Up @@ -3304,6 +3304,12 @@ public void testInvalidFork() {
expectError("FROM foo* | FORK ( FORK (WHERE x>1) (WHERE y>1)) (WHERE z>1)", "line 1:20: mismatched input 'FORK'");
expectError("FROM foo* | FORK ( x+1 ) ( WHERE y>2 )", "line 1:20: mismatched input 'x+1'");
expectError("FROM foo* | FORK ( LIMIT 10 ) ( y+2 )", "line 1:33: mismatched input 'y+2'");

var fromPatterns = randomIndexPatterns(CROSS_CLUSTER);
expectError(
"FROM " + fromPatterns + " | FORK (EVAL a = 1) (EVAL a = 2)",
"invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with FORK"
);
}

public void testFieldNamesAsCommands() throws Exception {
Expand Down Expand Up @@ -3429,6 +3435,12 @@ public void testInvalidRerank() {
assumeTrue("RERANK requires corresponding capability", EsqlCapabilities.Cap.RERANK.isEnabled());
expectError("FROM foo* | RERANK ON title WITH inferenceId", "line 1:20: mismatched input 'ON' expecting {QUOTED_STRING");
expectError("FROM foo* | RERANK \"query text\" WITH inferenceId", "line 1:33: mismatched input 'WITH' expecting 'on'");

var fromPatterns = randomIndexPatterns(CROSS_CLUSTER);
expectError(
"FROM " + fromPatterns + " | RERANK \"query text\" ON title WITH inferenceId",
"invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with RERANK"
);
}

public void testCompletionUsingFieldAsPrompt() {
Expand Down Expand Up @@ -3479,6 +3491,12 @@ public void testInvalidCompletion() {
expectError("FROM foo* | COMPLETION completion=prompt WITH", "line 1:46: mismatched input '<EOF>' expecting {");

expectError("FROM foo* | COMPLETION completion=prompt", "line 1:41: mismatched input '<EOF>' expecting {");

var fromPatterns = randomIndexPatterns(CROSS_CLUSTER);
expectError(
"FROM " + fromPatterns + " | COMPLETION prompt_field WITH inferenceId",
"invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with COMPLETION"
);
}

public void testSample() {
Expand Down