Skip to content

Commit e4155ea

Browse files
authored
Remote Lookup Join implementation (#129013)
Remote lookup join implementation This patch enables using LOOKUP JOIN with cross-cluster queries. Example: FROM logs-*, remote:logs-* | LOOKUP JOIN clients on ip | SORT timestamp | LIMIT 100
1 parent d2fe7c9 commit e4155ea

File tree

33 files changed

+1487
-179
lines changed

33 files changed

+1487
-179
lines changed

docs/changelog/129013.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
pr: 129013
2+
summary: "Add remote index support to LOOKUP JOIN"
3+
area: ES|QL
4+
type: feature
5+
issues: [ ]
6+
highlight:
7+
title: Add remote index support to LOOKUP JOIN
8+
body: |-
9+
Queries containing LOOKUP JOIN now can be preformed on cross-cluster indices, for example:
10+
[source,yaml]
11+
----------------------------
12+
FROM logs-*, remote:logs-* | LOOKUP JOIN clients on ip | SORT timestamp | LIMIT 100
13+
----------------------------

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ static TransportVersion def(int id) {
336336
public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00);
337337
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00);
338338
public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00);
339+
public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00);
339340

340341
/*
341342
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
136136
task.skipTest("esql/191_lookup_join_on_datastreams/data streams not supported in LOOKUP JOIN", "Added support for aliases in JOINs")
137137
task.skipTest("esql/190_lookup_join/non-lookup index", "Error message changed")
138138
task.skipTest("esql/192_lookup_join_on_aliases/alias-pattern-multiple", "Error message changed")
139+
task.skipTest("esql/190_lookup_join/fails with non-lookup index", "Error message changed")
140+
task.skipTest("esql/192_lookup_join_on_aliases/fails when alias or pattern resolves to multiple", "Error message changed")
139141
})
140142

141143
tasks.named('yamlRestCompatTest').configure {

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 69 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.test.rest.TestFeatureService;
2525
import org.elasticsearch.xpack.esql.CsvSpecReader;
2626
import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase;
27+
import org.elasticsearch.xpack.esql.CsvTestsDataLoader;
2728
import org.elasticsearch.xpack.esql.SpecReader;
2829
import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase;
2930
import org.junit.AfterClass;
@@ -39,13 +40,16 @@
3940
import java.util.List;
4041
import java.util.Locale;
4142
import java.util.Optional;
43+
import java.util.Set;
4244
import java.util.regex.Pattern;
4345
import java.util.stream.Collectors;
4446

4547
import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser;
4648
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
49+
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
4750
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
4851
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
52+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE;
4953
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
5054
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
5155
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
@@ -109,6 +113,22 @@ public MultiClusterSpecIT(
109113
super(fileName, groupName, testName, lineNumber, convertToRemoteIndices(testCase), instructions, mode);
110114
}
111115

116+
// TODO: think how to handle this better
117+
public static final Set<String> NO_REMOTE_LOOKUP_JOIN_TESTS = Set.of(
118+
// Lookup join after STATS is not supported in CCS yet
119+
"StatsAndLookupIPAndMessageFromIndex",
120+
"JoinMaskingRegex",
121+
"StatsAndLookupIPFromIndex",
122+
"StatsAndLookupMessageFromIndex",
123+
"MvJoinKeyOnTheLookupIndexAfterStats",
124+
"MvJoinKeyOnFromAfterStats",
125+
// Lookup join after SORT is not supported in CCS yet
126+
"NullifiedJoinKeyToPurgeTheJoin",
127+
"SortBeforeAndAfterJoin",
128+
"SortEvalBeforeLookup",
129+
"SortBeforeAndAfterMultipleJoinAndMvExpand"
130+
);
131+
112132
@Override
113133
protected void shouldSkipTest(String testName) throws IOException {
114134
boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName());
@@ -129,10 +149,20 @@ protected void shouldSkipTest(String testName) throws IOException {
129149
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
130150
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
131151
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName()));
132-
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
152+
if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) {
153+
assumeTrue("LOOKUP JOIN not yet supported in CCS", hasCapabilities(List.of(ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName())));
154+
}
133155
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
134156
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
135157
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName()));
158+
// Tests that use capabilities not supported in CCS
159+
assumeFalse(
160+
"This syntax is not supported with remote LOOKUP JOIN",
161+
NO_REMOTE_LOOKUP_JOIN_TESTS.stream().anyMatch(testName::contains)
162+
);
163+
// Tests that do SORT before LOOKUP JOIN - not supported in CCS
164+
assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator"));
165+
136166
}
137167

138168
@Override
@@ -181,6 +211,19 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
181211
// These indices are used in metadata tests so we want them on remote only for consistency
182212
public static final List<String> METADATA_INDICES = List.of("employees", "apps", "ul_logs");
183213

214+
// These are lookup indices, we want them on both remotes and locals
215+
public static final Set<String> LOOKUP_INDICES = CSV_DATASET_MAP.values()
216+
.stream()
217+
.filter(td -> td.settingFileName() != null && td.settingFileName().equals("lookup-settings.json"))
218+
.map(CsvTestsDataLoader.TestDataset::indexName)
219+
.collect(Collectors.toSet());
220+
221+
public static final Set<String> LOOKUP_ENDPOINTS = LOOKUP_INDICES.stream().map(i -> "/" + i + "/_bulk").collect(Collectors.toSet());
222+
223+
public static final Set<String> ENRICH_ENDPOINTS = ENRICH_SOURCE_INDICES.stream()
224+
.map(i -> "/" + i + "/_bulk")
225+
.collect(Collectors.toSet());
226+
184227
/**
185228
* Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests.
186229
* - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG,
@@ -199,15 +242,17 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
199242
return localClient.performRequest(request);
200243
} else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
201244
return remoteClient.performRequest(request);
202-
} else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
203-
return bulkClient.performRequest(request);
204-
} else {
205-
Request[] clones = cloneRequests(request, 2);
206-
Response resp1 = remoteClient.performRequest(clones[0]);
207-
Response resp2 = localClient.performRequest(clones[1]);
208-
assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());
209-
return resp2;
210-
}
245+
} else if (endpoint.endsWith("/_bulk")
246+
&& ENRICH_ENDPOINTS.contains(endpoint) == false
247+
&& LOOKUP_ENDPOINTS.contains(endpoint) == false) {
248+
return bulkClient.performRequest(request);
249+
} else {
250+
Request[] clones = cloneRequests(request, 2);
251+
Response resp1 = remoteClient.performRequest(clones[0]);
252+
Response resp2 = localClient.performRequest(clones[1]);
253+
assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());
254+
return resp2;
255+
}
211256
});
212257
doAnswer(invocation -> {
213258
IOUtils.close(localClient, remoteClient);
@@ -251,37 +296,32 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
251296
String query = testCase.query;
252297
String[] commands = query.split("\\|");
253298
String first = commands[0].trim();
299+
// If true, we're using *:index, otherwise we're using *:index,index
300+
boolean onlyRemotes = canUseRemoteIndicesOnly() && randomBoolean();
254301
if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) {
255302
String[] parts = commands[0].split("(?i)metadata");
256303
assert parts.length >= 1 : parts;
257304
String fromStatement = parts[0];
258305
String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
259-
final String remoteIndices;
260-
if (canUseRemoteIndicesOnly() && randomBoolean()) {
261-
remoteIndices = Arrays.stream(localIndices)
262-
.map(index -> unquoteAndRequoteAsRemote(index.trim(), true))
263-
.collect(Collectors.joining(","));
264-
} else {
265-
remoteIndices = Arrays.stream(localIndices)
266-
.map(index -> unquoteAndRequoteAsRemote(index.trim(), false))
267-
.collect(Collectors.joining(","));
306+
if (Arrays.stream(localIndices).anyMatch(i -> LOOKUP_INDICES.contains(i.trim().toLowerCase(Locale.ROOT)))) {
307+
// If the query contains lookup indices, use only remotes to avoid duplication
308+
onlyRemotes = true;
268309
}
310+
final boolean onlyRemotesFinal = onlyRemotes;
311+
final String remoteIndices = Arrays.stream(localIndices)
312+
.map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal))
313+
.collect(Collectors.joining(","));
269314
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
270315
testCase.query = newFrom + query.substring(first.length());
271316
}
272317
if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) {
273318
String[] parts = commands[0].split("\\s+");
274319
assert parts.length >= 2 : commands[0];
275320
String[] indices = parts[1].split(",");
276-
if (canUseRemoteIndicesOnly() && randomBoolean()) {
277-
parts[1] = Arrays.stream(indices)
278-
.map(index -> unquoteAndRequoteAsRemote(index.trim(), true))
279-
.collect(Collectors.joining(","));
280-
} else {
281-
parts[1] = Arrays.stream(indices)
282-
.map(index -> unquoteAndRequoteAsRemote(index.trim(), false))
283-
.collect(Collectors.joining(","));
284-
}
321+
final boolean onlyRemotesFinal = onlyRemotes;
322+
parts[1] = Arrays.stream(indices)
323+
.map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal))
324+
.collect(Collectors.joining(","));
285325
String newNewMetrics = String.join(" ", parts);
286326
testCase.query = newNewMetrics + query.substring(first.length());
287327
}
@@ -359,9 +399,7 @@ protected boolean supportsInferenceTestService() {
359399

360400
@Override
361401
protected boolean supportsIndexModeLookup() throws IOException {
362-
// CCS does not yet support JOIN_LOOKUP_V10 and clusters falsely report they have this capability
363-
// return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName()));
364-
return false;
402+
return hasCapabilities(List.of(JOIN_LOOKUP_V12.capabilityName()));
365403
}
366404

367405
@Override

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ record Doc(int id, String color, long data) {
7272
List<Doc> localDocs = List.of();
7373
final String remoteIndex = "test-remote-index";
7474
List<Doc> remoteDocs = List.of();
75+
final String lookupIndexLocal = "test-lookup-index-local";
76+
final String lookupIndexRemote = "test-lookup-index-remote";
77+
final String lookupAlias = "test-lookup-index";
7578

7679
@Before
7780
public void setUpIndices() throws Exception {
@@ -109,6 +112,44 @@ public void setUpIndices() throws Exception {
109112
}
110113
}
111114

115+
private void setupLookupIndices() throws IOException {
116+
RestClient localClient = client();
117+
final String mapping = """
118+
"properties": {
119+
"data": { "type": "long" },
120+
"morecolor": { "type": "keyword" }
121+
}
122+
""";
123+
var lookupDocs = IntStream.range(0, between(1, 5))
124+
.mapToObj(n -> new Doc(n, randomFrom("red", "yellow", "green"), randomIntBetween(1, 1000)))
125+
.toList();
126+
createIndex(
127+
localClient,
128+
lookupIndexLocal,
129+
Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup").build(),
130+
mapping,
131+
"\"" + lookupAlias + "\":{}"
132+
);
133+
indexDocs(localClient, lookupIndexLocal, lookupDocs);
134+
try (RestClient remoteClient = remoteClusterClient()) {
135+
createIndex(
136+
remoteClient,
137+
lookupIndexRemote,
138+
Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup").build(),
139+
mapping,
140+
"\"" + lookupAlias + "\":{}"
141+
);
142+
indexDocs(remoteClient, lookupIndexRemote, lookupDocs);
143+
}
144+
}
145+
146+
public void wipeLookupIndices() throws IOException {
147+
try (RestClient remoteClient = remoteClusterClient()) {
148+
deleteIndex(remoteClient, lookupIndexRemote);
149+
}
150+
deleteIndex(client(), lookupIndexLocal);
151+
}
152+
112153
@After
113154
public void wipeIndices() throws Exception {
114155
try (RestClient remoteClient = remoteClusterClient()) {
@@ -416,6 +457,64 @@ public void testStats() throws IOException {
416457
assertThat(clusterData, hasKey("took"));
417458
}
418459

460+
public void testLookupJoinAliases() throws IOException {
461+
assumeTrue(
462+
"Local cluster does not support multiple LOOKUP JOIN aliases",
463+
supportsLookupJoinAliases(Clusters.localClusterVersion())
464+
);
465+
assumeTrue(
466+
"Remote cluster does not support multiple LOOKUP JOIN aliases",
467+
supportsLookupJoinAliases(Clusters.remoteClusterVersion())
468+
);
469+
try {
470+
setupLookupIndices();
471+
Map<String, Object> result = run(
472+
"FROM test-local-index,*:test-remote-index | LOOKUP JOIN test-lookup-index ON data | STATS c = COUNT(*)",
473+
true
474+
);
475+
var columns = List.of(Map.of("name", "c", "type", "long"));
476+
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
477+
assertResultMap(true, result, columns, values, false);
478+
} finally {
479+
wipeLookupIndices();
480+
}
481+
}
482+
483+
public void testLookupJoinAliasesSkipOld() throws IOException {
484+
assumeTrue(
485+
"Local cluster does not support multiple LOOKUP JOIN aliases",
486+
supportsLookupJoinAliases(Clusters.localClusterVersion())
487+
);
488+
assumeFalse(
489+
"Remote cluster should not support multiple LOOKUP JOIN aliases",
490+
supportsLookupJoinAliases(Clusters.remoteClusterVersion())
491+
);
492+
try {
493+
setupLookupIndices();
494+
Map<String, Object> result = run(
495+
"FROM test-local-index,*:test-remote-index | LOOKUP JOIN test-lookup-index ON data | STATS c = COUNT(*)",
496+
true
497+
);
498+
var columns = List.of(Map.of("name", "c", "type", "long"));
499+
var values = List.of(List.of(localDocs.size()));
500+
501+
MapMatcher mapMatcher = getResultMatcher(true, false, result.containsKey("documents_found")).extraOk();
502+
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
503+
mapMatcher = mapMatcher.entry("is_partial", true);
504+
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
505+
// check that the remote is skipped
506+
@SuppressWarnings("unchecked")
507+
Map<String, Object> clusters = (Map<String, Object>) result.get("_clusters");
508+
@SuppressWarnings("unchecked")
509+
Map<String, Object> details = (Map<String, Object>) clusters.get("details");
510+
@SuppressWarnings("unchecked")
511+
Map<String, Object> remoteCluster = (Map<String, Object>) details.get("remote_cluster");
512+
assertThat(remoteCluster.get("status"), equalTo("skipped"));
513+
} finally {
514+
wipeLookupIndices();
515+
}
516+
}
517+
419518
public void testLikeIndex() throws Exception {
420519

421520
boolean includeCCSMetadata = includeCCSMetadata();
@@ -576,6 +675,10 @@ private static boolean capabilitiesEndpointAvailable() {
576675
return Clusters.localClusterVersion().onOrAfter(Version.V_8_15_0);
577676
}
578677

678+
private static boolean supportsLookupJoinAliases(Version version) {
679+
return version.onOrAfter(Version.V_9_2_0);
680+
}
681+
579682
private static boolean includeCCSMetadata() {
580683
return ccsMetadataAvailable() && randomBoolean();
581684
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.test.MapMatcher;
1919
import org.elasticsearch.test.TestClustersThreadFilter;
2020
import org.elasticsearch.test.cluster.ElasticsearchCluster;
21+
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
2122
import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase;
2223
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
2324
import org.hamcrest.Matcher;
@@ -35,6 +36,8 @@
3536
import static org.elasticsearch.test.ListMatcher.matchesList;
3637
import static org.elasticsearch.test.MapMatcher.assertMap;
3738
import static org.elasticsearch.test.MapMatcher.matchesMap;
39+
import static org.hamcrest.Matchers.allOf;
40+
import static org.hamcrest.Matchers.containsString;
3841
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3942
import static org.hamcrest.Matchers.hasSize;
4043
import static org.hamcrest.Matchers.instanceOf;
@@ -153,6 +156,35 @@ private static boolean checkVersion(org.elasticsearch.Version version) {
153156
|| (version.onOrAfter(Version.fromString("8.19.0")) && version.before(Version.fromString("9.0.0")));
154157
}
155158

159+
public void testIndicesDontExistWithRemoteLookupJoin() throws IOException {
160+
assumeTrue("Only works with remote LOOKUP JOIN support", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled());
161+
// This check is for "local" cluster - which is different from test runner actually, so it could be old
162+
assumeTrue(
163+
"Only works with remote LOOKUP JOIN support",
164+
clusterHasCapability(
165+
client(),
166+
"POST",
167+
"_query",
168+
List.of(),
169+
List.of(EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName())
170+
).orElse(false)
171+
);
172+
173+
int docsTest1 = randomIntBetween(1, 5);
174+
indexTimestampData(docsTest1, "test1", "2024-11-26", "id1");
175+
176+
var pattern = "FROM test1,*:test1";
177+
ResponseException e = expectThrows(
178+
ResponseException.class,
179+
() -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1"))
180+
);
181+
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
182+
assertThat(
183+
e.getMessage(),
184+
allOf(containsString("verification_exception"), containsString("Unknown index [foo,remote_cluster:foo]"))
185+
);
186+
}
187+
156188
// We need a separate test since remote missing indices and local missing indices now work differently
157189
public void testIndicesDontExistRemote() throws IOException {
158190
// Exclude old versions

0 commit comments

Comments
 (0)