-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Remote Lookup Join implementation #129013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 69 commits
3f614af
ba8a039
2b5aaef
c3ae03d
622f4e6
d31cbfb
02cd72b
3044290
2c63a23
4c29dc7
c466265
b056662
82fdd35
abcc586
ab5c4b7
5eddf62
dfc4f5a
582f6a5
d248627
783e196
e877f4e
5071a9f
0e73a21
2bf0504
df92903
8a64896
381cbf1
1f818b2
704ebf2
a5973cf
698664f
8e8123c
4039f67
de786d4
19a8a75
da1aac7
dfba8a3
4132884
913dbe3
05e8184
53cf4a8
cdd05a1
935bf0a
72f68c1
2cb10a7
9a8e651
0c0bca0
4440b1b
201813b
4877657
a9a19f8
240351a
93761ee
657d682
30715d1
c16fd5e
837640a
efe0de2
4f061ae
f4afc94
dc5a3c1
71892fc
5caebed
d899cdc
edb02b6
dc35fa9
4c6f202
d463df2
f58231e
6597067
03a02fb
9858ef5
a1b2bbf
bd86864
9c90f92
10b080f
44a42ce
d6d0f91
b11ab0c
b170550
44e1bf3
288b7c4
7d0fc12
822b139
8ecf11f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| pr: 129013 | ||
| summary: "Add remote index support to LOOKUP JOIN" | ||
| area: ES|QL | ||
| type: feature | ||
| issues: [ ] | ||
| highlight: | ||
| title: Add remote index support to LOOKUP JOIN | ||
| body: |- | ||
| Queries containing LOOKUP JOIN now can be preformed on cross-cluster indices, for example: | ||
| [source,yaml] | ||
| ---------------------------- | ||
| FROM logs-*, remote:logs-* | LOOKUP JOIN clients on ip | SORT timestamp | LIMIT 100 | ||
| ---------------------------- |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ | |
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.regex.Pattern; | ||
| import java.util.stream.Collectors; | ||
|
|
||
|
|
@@ -109,6 +110,22 @@ public MultiClusterSpecIT( | |
| super(fileName, groupName, testName, lineNumber, convertToRemoteIndices(testCase), instructions, mode); | ||
| } | ||
|
|
||
| // TODO: think how to handle this better | ||
| public static final Set<String> NO_REMOTE_LOOKUP_JOIN_TESTS = Set.of( | ||
| // Lookup join after STATS is not supported in CCS yet | ||
| "StatsAndLookupIPAndMessageFromIndex", | ||
| "JoinMaskingRegex", | ||
| "StatsAndLookupIPFromIndex", | ||
| "StatsAndLookupMessageFromIndex", | ||
| "MvJoinKeyOnTheLookupIndexAfterStats", | ||
| "MvJoinKeyOnFromAfterStats", | ||
| // Lookup join after SORT is not supported in CCS yet | ||
| "NullifiedJoinKeyToPurgeTheJoin", | ||
| "SortBeforeAndAfterJoin", | ||
| "SortEvalBeforeLookup", | ||
| "SortBeforeAndAfterMultipleJoinAndMvExpand" | ||
| ); | ||
|
|
||
| @Override | ||
| protected void shouldSkipTest(String testName) throws IOException { | ||
| boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName()); | ||
|
|
@@ -129,10 +146,21 @@ protected void shouldSkipTest(String testName) throws IOException { | |
| assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); | ||
| assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); | ||
| assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName())); | ||
| assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())); | ||
| if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) { | ||
| assumeTrue("LOKUP JOIN not supported", supportsIndexModeLookup()); | ||
| assumeTrue("LOOKUP JOIN not yet supported in CCS", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.2.0"))); | ||
smalyshev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. | ||
| assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); | ||
| assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName())); | ||
| // Tests that use capabilities not supported in CCS | ||
| assumeFalse( | ||
| "This syntax is not supported with remote LOOKUP JOIN", | ||
| NO_REMOTE_LOOKUP_JOIN_TESTS.stream().anyMatch(testName::contains) | ||
| ); | ||
| // Tests that do SORT before LOOKUP JOIN - not supported in CCS | ||
| assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator")); | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -181,6 +209,28 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw | |
| // These indices are used in metadata tests so we want them on remote only for consistency | ||
| public static final List<String> METADATA_INDICES = List.of("employees", "apps", "ul_logs"); | ||
|
|
||
| // These are lookup indices, we want them on both remotes and locals | ||
| // TODO: can we somehow find it from the data loader? | ||
smalyshev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public static final Set<String> LOOKUP_INDICES = Set.of( | ||
| "languages_nested_fields", | ||
| "languages_lookup", | ||
| "clientips_lookup", | ||
| "languages_mixed_numerics", | ||
| "threat_list", | ||
| "message_types_lookup", | ||
| "host_inventory", | ||
| "ownerships", | ||
| "languages_lookup_non_unique_key", | ||
| "lookup_sample_data_ts_nanos", | ||
| "service_owners" | ||
| ); | ||
|
|
||
| public static final Set<String> LOOKUP_ENDPOINTS = LOOKUP_INDICES.stream().map(i -> "/" + i + "/_bulk").collect(Collectors.toSet()); | ||
|
|
||
| public static final Set<String> ENRICH_ENDPOINTS = ENRICH_SOURCE_INDICES.stream() | ||
| .map(i -> "/" + i + "/_bulk") | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| /** | ||
| * Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests. | ||
| * - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG, | ||
|
|
@@ -199,15 +249,17 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th | |
| return localClient.performRequest(request); | ||
| } else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) { | ||
| return remoteClient.performRequest(request); | ||
| } else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) { | ||
| return bulkClient.performRequest(request); | ||
| } else { | ||
| Request[] clones = cloneRequests(request, 2); | ||
| Response resp1 = remoteClient.performRequest(clones[0]); | ||
| Response resp2 = localClient.performRequest(clones[1]); | ||
| assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); | ||
| return resp2; | ||
| } | ||
| } else if (endpoint.endsWith("/_bulk") | ||
| && ENRICH_ENDPOINTS.contains(endpoint) == false | ||
| && LOOKUP_ENDPOINTS.contains(endpoint) == false) { | ||
| return bulkClient.performRequest(request); | ||
| } else { | ||
| Request[] clones = cloneRequests(request, 2); | ||
| Response resp1 = remoteClient.performRequest(clones[0]); | ||
| Response resp2 = localClient.performRequest(clones[1]); | ||
| assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); | ||
| return resp2; | ||
| } | ||
| }); | ||
| doAnswer(invocation -> { | ||
| IOUtils.close(localClient, remoteClient); | ||
|
|
@@ -251,37 +303,32 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas | |
| String query = testCase.query; | ||
| String[] commands = query.split("\\|"); | ||
| String first = commands[0].trim(); | ||
| // If true, we're using *:index, otherwise we're using *:index,index | ||
| boolean onlyRemotes = canUseRemoteIndicesOnly() && randomBoolean(); | ||
| if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) { | ||
| String[] parts = commands[0].split("(?i)metadata"); | ||
| assert parts.length >= 1 : parts; | ||
| String fromStatement = parts[0]; | ||
| String[] localIndices = fromStatement.substring("FROM ".length()).split(","); | ||
| final String remoteIndices; | ||
| if (canUseRemoteIndicesOnly() && randomBoolean()) { | ||
| remoteIndices = Arrays.stream(localIndices) | ||
| .map(index -> unquoteAndRequoteAsRemote(index.trim(), true)) | ||
| .collect(Collectors.joining(",")); | ||
| } else { | ||
| remoteIndices = Arrays.stream(localIndices) | ||
| .map(index -> unquoteAndRequoteAsRemote(index.trim(), false)) | ||
| .collect(Collectors.joining(",")); | ||
| if (Arrays.stream(localIndices).anyMatch(i -> LOOKUP_INDICES.contains(i.trim().toLowerCase(Locale.ROOT)))) { | ||
| // If the query contains lookup indices, use only remotes to avoid duplication | ||
| onlyRemotes = true; | ||
|
Comment on lines
+307
to
+308
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Err, doesn't this mean this suite doesn't at all test the case where a lookup happens both on remote and the local cluster? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one doesn't because it wouldn't match the results - we'd have more rows than the local case. We have IT tests for that case. I haven't found any easy way to make these tests work in "both" case without substantially changing them. |
||
| } | ||
| final boolean onlyRemotesFinal = onlyRemotes; | ||
alex-spies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final String remoteIndices = Arrays.stream(localIndices) | ||
| .map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal)) | ||
| .collect(Collectors.joining(",")); | ||
| var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length()); | ||
| testCase.query = newFrom + query.substring(first.length()); | ||
| } | ||
| if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) { | ||
| String[] parts = commands[0].split("\\s+"); | ||
| assert parts.length >= 2 : commands[0]; | ||
| String[] indices = parts[1].split(","); | ||
| if (canUseRemoteIndicesOnly() && randomBoolean()) { | ||
| parts[1] = Arrays.stream(indices) | ||
| .map(index -> unquoteAndRequoteAsRemote(index.trim(), true)) | ||
| .collect(Collectors.joining(",")); | ||
| } else { | ||
| parts[1] = Arrays.stream(indices) | ||
| .map(index -> unquoteAndRequoteAsRemote(index.trim(), false)) | ||
| .collect(Collectors.joining(",")); | ||
| } | ||
| final boolean onlyRemotesFinal = onlyRemotes; | ||
alex-spies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| parts[1] = Arrays.stream(indices) | ||
| .map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal)) | ||
| .collect(Collectors.joining(",")); | ||
| String newNewMetrics = String.join(" ", parts); | ||
| testCase.query = newNewMetrics + query.substring(first.length()); | ||
| } | ||
|
|
@@ -359,9 +406,7 @@ protected boolean supportsInferenceTestService() { | |
|
|
||
| @Override | ||
| protected boolean supportsIndexModeLookup() throws IOException { | ||
| // CCS does not yet support JOIN_LOOKUP_V10 and clusters falsely report they have this capability | ||
| // return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName())); | ||
| return false; | ||
| return hasCapabilities(List.of(JOIN_LOOKUP_V12.capabilityName())); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.