Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_FORK_FOR_REMOTE_INDICES;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
Expand Down Expand Up @@ -116,7 +117,9 @@ public MultiClusterSpecIT(
"LookupJoinOnTwoFieldsAfterTop",
"LookupJoinOnTwoFieldsMultipleTimes",
// Lookup join after LIMIT is not supported in CCS yet
"LookupJoinAfterLimitAndRemoteEnrich"
"LookupJoinAfterLimitAndRemoteEnrich",
// Lookup join after FORK is not support in CCS yet
"ForkBeforeLookupJoin"
);

@Override
Expand Down Expand Up @@ -146,7 +149,6 @@ protected void shouldSkipTest(String testName) throws IOException {
}
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName()));
// Tests that use capabilities not supported in CCS
assumeFalse(
"This syntax is not supported with remote LOOKUP JOIN",
Expand All @@ -155,6 +157,12 @@ protected void shouldSkipTest(String testName) throws IOException {
// Tests that do SORT before LOOKUP JOIN - not supported in CCS
assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator"));

if (testCase.requiredCapabilities.contains(FORK_V9.capabilityName())) {
assumeTrue(
"FORK not yet supported with CCS",
hasCapabilities(adminClient(), List.of(ENABLE_FORK_FOR_REMOTE_INDICES.capabilityName()))
);
}
}

@Override
Expand Down
151 changes: 76 additions & 75 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -384,133 +384,134 @@ a:long | b:keyword | _fork:keyword
forkAfterDrop
required_capability: fork_v9

FROM languages
| DROP language_code
| FORK ( WHERE language_name == "English" | EVAL x = 1 )
( WHERE language_name != "English" )
| SORT _fork, language_name
FROM employees
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the tests that were querying the languages index failed - with rows being duplicated:

Actual: |  
-- | --
language_name:keyword \| x:integer \| _fork:keyword |  
English               \| 1         \| fork1 |  
English               \| 1         \| fork1 |  
French                \| null      \| fork2 |  
French                \| null      \| fork2 |  
German                \| null      \| fork2 |  
German                \| null      \| fork2 |  
Spanish               \| null      \| fork2 |  
Spanish               \| null      \| fork2 |  
  |  
Expected: |  
language_name:keyword \| x:integer \| _fork:keyword |  
English               \| 1         \| fork1 |  
French                \| null      \| fork2 |  
German                \| null      \| fork2 |  
Spanish               \| null      \| fork2

debugging these test showed that we were querying both the local and remote index, which is why we had duplicate results.

languages was an index that was primarily used for testing lookup join for which we had some special handling such that we disallow duplicates:

if (Arrays.stream(localIndices).anyMatch(i -> LOOKUP_INDICES.contains(i.trim().toLowerCase(Locale.ROOT)))) {
// If the query contains lookup indices, use only remotes to avoid duplication
onlyRemotes = true;
}
final boolean onlyRemotesFinal = onlyRemotes;

languages is not part of LOOKUP_INDICES (but it should be?)
anyway, I thought that it might be better to rewrite these tests with an index that has no special handling - so I used employees here instead of languages.

| DROP first_name
| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = 1 )
( WHERE emp_no == 10048 )
| SORT _fork, emp_no, x
| KEEP emp_no, x, _fork
;

language_name:keyword | x:integer | _fork:keyword
English | 1 | fork1
French | null | fork2
German | null | fork2
Spanish | null | fork2
emp_no:integer | x:integer | _fork:keyword
10048 | 1 | fork1
10081 | 1 | fork1
10048 | null | fork2
;

forkBranchWithDrop
required_capability: fork_v9

FROM languages
| FORK ( EVAL x = 1 | DROP language_code | WHERE language_name == "English" | DROP x )
( WHERE language_name != "English" )
| SORT _fork, language_name
| KEEP language_name, language_code, _fork
FROM employees
| FORK ( EVAL x = 1 | DROP first_name | WHERE emp_no == 10048 OR emp_no == 10081 | DROP x )
( WHERE emp_no == 10048 )
| SORT _fork, emp_no
| KEEP emp_no, _fork
;

language_name:keyword | language_code:integer | _fork:keyword
English | null | fork1
French | 2 | fork2
German | 4 | fork2
Spanish | 3 | fork2
emp_no:integer | _fork:keyword
10048 | fork1
10081 | fork1
10048 | fork2
;


forkBeforeDrop
required_capability: fork_v9

FROM languages
| FORK (WHERE language_code == 1 OR language_code == 2)
(WHERE language_code == 1)
| DROP language_code
| SORT _fork, language_name
FROM employees
| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = 1 )
( WHERE emp_no == 10048 )
| DROP first_name, x
| SORT _fork, emp_no
| KEEP emp_no, _fork
;

language_name:keyword | _fork:keyword
English | fork1
French | fork1
English | fork2
emp_no:integer | _fork:keyword
10048 | fork1
10081 | fork1
10048 | fork2
;

forkBranchWithKeep
required_capability: fork_v9

FROM languages
| FORK ( WHERE language_name == "English" | KEEP language_name, language_code )
( WHERE language_name != "English" )
| SORT _fork, language_name
FROM employees
| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 | KEEP emp_no )
( WHERE emp_no == 10048 )
| SORT _fork, emp_no
| KEEP emp_no, _fork
;

language_name:keyword | language_code:integer | _fork:keyword
English | 1 | fork1
French | 2 | fork2
German | 4 | fork2
Spanish | 3 | fork2
emp_no:integer | _fork:keyword
10048 | fork1
10081 | fork1
10048 | fork2
;

forkBeforeRename
required_capability: fork_v9

FROM languages
| FORK (WHERE language_code == 1 OR language_code == 2)
(WHERE language_code == 1)
| RENAME language_code AS code
| SORT _fork, language_name
FROM employees
| FORK ( WHERE emp_no == 10048 OR emp_no == 10081 )
( WHERE emp_no == 10048 )
| RENAME emp_no AS code
| SORT _fork, code
| KEEP code, _fork
;

code:integer | language_name:keyword | _fork:keyword
1 | English | fork1
2 | French | fork1
1 | English | fork2
code:integer | _fork:keyword
10048 | fork1
10081 | fork1
10048 | fork2
;

forkBranchWithRenameAs
required_capability: fork_v9

FROM languages
| FORK (RENAME language_code AS code | WHERE code == 1 OR code == 2)
(WHERE language_code == 1 | RENAME language_code AS x)
| SORT _fork, language_name
| KEEP code, language_name, x, _fork
FROM employees
| FORK (RENAME emp_no AS emp_code | WHERE emp_code == 10048 OR emp_code == 10081)
(WHERE emp_no == 10048 | RENAME emp_no AS x)
| SORT _fork, emp_code, x
| KEEP emp_code, x, _fork
;

code:integer | language_name:keyword | x:integer | _fork:keyword
1 | English | null | fork1
2 | French | null | fork1
null | English | 1 | fork2
emp_code:integer | x:integer | _fork:keyword
10048 | null | fork1
10081 | null | fork1
null | 10048 | fork2
;

forkBranchWithRenameEquals
required_capability: fork_v9

FROM languages
| FORK (RENAME code = language_code | WHERE code == 1 OR code == 2)
(WHERE language_code == 1 | RENAME x = language_code)
| SORT _fork, language_name
| KEEP code, language_name, x, _fork
FROM employees
| FORK (RENAME emp_code = emp_no | WHERE emp_code == 10048 OR emp_code == 10081)
(WHERE emp_no == 10048 | RENAME x = emp_no)
| SORT _fork, emp_code, x
| KEEP emp_code, x, _fork
;

code:integer | language_name:keyword | x:integer | _fork:keyword
1 | English | null | fork1
2 | French | null | fork1
null | English | 1 | fork2
emp_code:integer | x:integer | _fork:keyword
10048 | null | fork1
10081 | null | fork1
null | 10048 | fork2
;


forkAfterRename
required_capability: fork_v9

FROM languages
| RENAME language_code AS code
| FORK (WHERE code == 1 OR code == 2)
(WHERE code == 1)
| SORT _fork, language_name
FROM employees
| RENAME emp_no AS emp_code
| FORK (WHERE emp_code == 10048 OR emp_code == 10081)
(WHERE emp_code == 10048)
| SORT _fork, emp_code
| KEEP emp_code, _fork
;

code:integer | language_name:keyword | _fork:keyword
1 | English | fork1
2 | French | fork1
1 | English | fork2
emp_code:integer | _fork:keyword
10048 | fork1
10081 | fork1
10048 | fork2
;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,12 @@ public enum Cap {
/**
* URL decoding function.
*/
URL_DECODE(Build.current().isSnapshot());
URL_DECODE(Build.current().isSnapshot()),

/**
* FORK with remote indices
*/
ENABLE_FORK_FOR_REMOTE_INDICES;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,6 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
}

return input -> {
checkForRemoteClusters(input, source(ctx), "FORK");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have made ENABLE_FORK_FOR_REMOTE_INDICES capability available only in snapshots and use it here - such that CCS support is only available in snapshots - but I don't think we need to 🤔

List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
return new Fork(source(ctx), subPlans, List.of());
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3628,12 +3628,6 @@ public void testInvalidFork() {
expectError("FROM foo* | FORK ( LIMIT 10 ) ( y+2 )", "line 1:33: mismatched input 'y+2'");
expectError("FROM foo* | FORK (where true) ()", "line 1:32: mismatched input ')'");
expectError("FROM foo* | FORK () (where true)", "line 1:19: mismatched input ')'");

var fromPatterns = randomIndexPatterns(CROSS_CLUSTER);
expectError(
"FROM " + fromPatterns + " | FORK (EVAL a = 1) (EVAL a = 2)",
"invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with FORK"
);
}

public void testFieldNamesAsCommands() throws Exception {
Expand Down