diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 4b90aa1a69c80..6546e86b7b70a 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -48,6 +48,7 @@ import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_FORK_FOR_REMOTE_INDICES; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; @@ -116,7 +117,9 @@ public MultiClusterSpecIT( "LookupJoinOnTwoFieldsAfterTop", "LookupJoinOnTwoFieldsMultipleTimes", // Lookup join after LIMIT is not supported in CCS yet - "LookupJoinAfterLimitAndRemoteEnrich" + "LookupJoinAfterLimitAndRemoteEnrich", + // Lookup join after FORK is not support in CCS yet + "ForkBeforeLookupJoin" ); @Override @@ -146,7 +149,6 @@ protected void shouldSkipTest(String testName) throws IOException { } // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); - assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName())); // Tests that use capabilities not supported in CCS assumeFalse( "This syntax is not supported with remote LOOKUP JOIN", @@ -155,6 +157,12 @@ protected void shouldSkipTest(String testName) throws IOException { // Tests that do SORT before LOOKUP JOIN - not supported in CCS assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator")); + if (testCase.requiredCapabilities.contains(FORK_V9.capabilityName())) { + assumeTrue( + "FORK not yet supported with CCS", + hasCapabilities(adminClient(), List.of(ENABLE_FORK_FOR_REMOTE_INDICES.capabilityName())) + ); + } } @Override diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec index 191d58a547c20..53c4343c96f26 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec @@ -384,133 +384,134 @@ a:long | b:keyword | _fork:keyword forkAfterDrop required_capability: fork_v9 -FROM languages -| DROP language_code -| FORK ( WHERE language_name == "English" | EVAL x = 1 ) - ( WHERE language_name != "English" ) -| SORT _fork, language_name +FROM employees +| DROP first_name +| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = 1 ) + ( WHERE emp_no == 10048 ) +| SORT _fork, emp_no, x +| KEEP emp_no, x, _fork ; -language_name:keyword | x:integer | _fork:keyword -English | 1 | fork1 -French | null | fork2 -German | null | fork2 -Spanish | null | fork2 +emp_no:integer | x:integer | _fork:keyword +10048 | 1 | fork1 +10081 | 1 | fork1 +10048 | null | fork2 ; forkBranchWithDrop required_capability: fork_v9 -FROM languages -| FORK ( EVAL x = 1 | DROP language_code | WHERE language_name == "English" | DROP x ) - ( WHERE language_name != "English" ) -| SORT _fork, language_name -| KEEP language_name, language_code, _fork +FROM employees +| FORK ( EVAL x = 1 | DROP first_name | WHERE emp_no == 10048 OR emp_no == 10081 | DROP x ) + ( WHERE emp_no == 10048 ) +| SORT _fork, emp_no +| KEEP emp_no, _fork ; -language_name:keyword | language_code:integer | _fork:keyword -English | null | fork1 -French | 2 | fork2 -German | 4 | fork2 -Spanish | 3 | fork2 +emp_no:integer | _fork:keyword +10048 | fork1 +10081 | fork1 +10048 | fork2 ; - forkBeforeDrop required_capability: fork_v9 -FROM languages -| FORK (WHERE language_code == 1 OR language_code == 2) - (WHERE language_code == 1) -| DROP language_code -| SORT _fork, language_name +FROM employees +| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = 1 ) + ( WHERE emp_no == 10048 ) +| DROP first_name, x +| SORT _fork, emp_no +| KEEP emp_no, _fork ; -language_name:keyword | _fork:keyword -English | fork1 -French | fork1 -English | fork2 +emp_no:integer | _fork:keyword +10048 | fork1 +10081 | fork1 +10048 | fork2 ; forkBranchWithKeep required_capability: fork_v9 -FROM languages -| FORK ( WHERE language_name == "English" | KEEP language_name, language_code ) - ( WHERE language_name != "English" ) -| SORT _fork, language_name +FROM employees +| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 | KEEP emp_no ) + ( WHERE emp_no == 10048 ) +| SORT _fork, emp_no +| KEEP emp_no, _fork ; -language_name:keyword | language_code:integer | _fork:keyword -English | 1 | fork1 -French | 2 | fork2 -German | 4 | fork2 -Spanish | 3 | fork2 +emp_no:integer | _fork:keyword +10048 | fork1 +10081 | fork1 +10048 | fork2 ; forkBeforeRename required_capability: fork_v9 -FROM languages -| FORK (WHERE language_code == 1 OR language_code == 2) - (WHERE language_code == 1) -| RENAME language_code AS code -| SORT _fork, language_name +FROM employees +| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 ) + ( WHERE emp_no == 10048 ) +| RENAME emp_no AS code +| SORT _fork, code +| KEEP code, _fork ; -code:integer | language_name:keyword | _fork:keyword -1 | English | fork1 -2 | French | fork1 -1 | English | fork2 +code:integer | _fork:keyword +10048 | fork1 +10081 | fork1 +10048 | fork2 ; forkBranchWithRenameAs required_capability: fork_v9 -FROM languages -| FORK (RENAME language_code AS code | WHERE code == 1 OR code == 2) - (WHERE language_code == 1 | RENAME language_code AS x) -| SORT _fork, language_name -| KEEP code, language_name, x, _fork +FROM employees +| FORK (RENAME emp_no AS emp_code | WHERE emp_code == 10048 OR emp_code == 10081) + (WHERE emp_no == 10048 | RENAME emp_no AS x) +| SORT _fork, emp_code, x +| KEEP emp_code, x, _fork ; -code:integer | language_name:keyword | x:integer | _fork:keyword -1 | English | null | fork1 -2 | French | null | fork1 -null | English | 1 | fork2 +emp_code:integer | x:integer | _fork:keyword +10048 | null | fork1 +10081 | null | fork1 +null | 10048 | fork2 ; forkBranchWithRenameEquals required_capability: fork_v9 -FROM languages -| FORK (RENAME code = language_code | WHERE code == 1 OR code == 2) - (WHERE language_code == 1 | RENAME x = language_code) -| SORT _fork, language_name -| KEEP code, language_name, x, _fork +FROM employees +| FORK (RENAME emp_code = emp_no | WHERE emp_code == 10048 OR emp_code == 10081) + (WHERE emp_no == 10048 | RENAME x = emp_no) +| SORT _fork, emp_code, x +| KEEP emp_code, x, _fork ; -code:integer | language_name:keyword | x:integer | _fork:keyword -1 | English | null | fork1 -2 | French | null | fork1 -null | English | 1 | fork2 +emp_code:integer | x:integer | _fork:keyword +10048 | null | fork1 +10081 | null | fork1 +null | 10048 | fork2 ; forkAfterRename required_capability: fork_v9 -FROM languages -| RENAME language_code AS code -| FORK (WHERE code == 1 OR code == 2) - (WHERE code == 1) -| SORT _fork, language_name +FROM employees +| RENAME emp_no AS emp_code +| FORK (WHERE emp_code == 10048 OR emp_code == 10081) + (WHERE emp_code == 10048) +| SORT _fork, emp_code +| KEEP emp_code, _fork ; -code:integer | language_name:keyword | _fork:keyword -1 | English | fork1 -2 | French | fork1 -1 | English | fork2 +emp_code:integer | _fork:keyword +10048 | fork1 +10081 | fork1 +10048 | fork2 ; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 478fb5af2676e..643f7e3027720 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1421,7 +1421,12 @@ public enum Cap { /** * URL decoding function. */ - URL_DECODE(Build.current().isSnapshot()); + URL_DECODE(Build.current().isSnapshot()), + + /** + * FORK with remote indices + */ + ENABLE_FORK_FOR_REMOTE_INDICES; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 974f5fa485e47..d698b64895e3d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -716,7 +716,6 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) { } return input -> { - checkForRemoteClusters(input, source(ctx), "FORK"); List subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList(); return new Fork(source(ctx), subPlans, List.of()); }; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index e1c52eb6845d9..d985937544436 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -3628,12 +3628,6 @@ public void testInvalidFork() { expectError("FROM foo* | FORK ( LIMIT 10 ) ( y+2 )", "line 1:33: mismatched input 'y+2'"); expectError("FROM foo* | FORK (where true) ()", "line 1:32: mismatched input ')'"); expectError("FROM foo* | FORK () (where true)", "line 1:19: mismatched input ')'"); - - var fromPatterns = randomIndexPatterns(CROSS_CLUSTER); - expectError( - "FROM " + fromPatterns + " | FORK (EVAL a = 1) (EVAL a = 2)", - "invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with FORK" - ); } public void testFieldNamesAsCommands() throws Exception {