Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
3f614af
Remote lookup join implementation
smalyshev Jun 5, 2025
ba8a039
refactor more
smalyshev Jun 5, 2025
2b5aaef
Oops typo
smalyshev Jun 5, 2025
c3ae03d
test fixes
smalyshev Jun 6, 2025
622f4e6
fix tests
smalyshev Jun 6, 2025
d31cbfb
test fixes
smalyshev Jun 10, 2025
02cd72b
[CI] Auto commit changes from spotless
Jun 10, 2025
3044290
fix tests
smalyshev Jun 10, 2025
2c63a23
fix more tests
smalyshev Jun 10, 2025
4c29dc7
[CI] Auto commit changes from spotless
Jun 10, 2025
c466265
more tests
smalyshev Jun 11, 2025
b056662
some refactoring and debugging facilities
smalyshev Jun 13, 2025
82fdd35
more testing
smalyshev Jun 16, 2025
abcc586
[CI] Auto commit changes from spotless
Jun 17, 2025
ab5c4b7
More tests & fixes
smalyshev Jun 17, 2025
5eddf62
more fixes
smalyshev Jun 18, 2025
dfc4f5a
fix check
smalyshev Jun 18, 2025
582f6a5
more fixes
smalyshev Jun 18, 2025
d248627
refactor mapper
smalyshev Jun 18, 2025
783e196
more fixes
smalyshev Jun 19, 2025
e877f4e
more tests
smalyshev Jun 23, 2025
5071a9f
typo
smalyshev Jun 23, 2025
0e73a21
more cases
smalyshev Jun 23, 2025
2bf0504
failure tests
smalyshev Jun 23, 2025
df92903
Some test refactoring & unsupported syntax tests
smalyshev Jun 24, 2025
8a64896
Add more tests to exclusion list due to SORT
smalyshev Jun 24, 2025
381cbf1
Refactor mapper, allow SORT and ban LIMIT pre-remote join
smalyshev Jun 26, 2025
1f818b2
One of the ways to fix the limit issue
smalyshev Jun 27, 2025
704ebf2
fix condition
smalyshev Jun 27, 2025
a5973cf
Sketch out another approach to planning remote join
alex-spies Jun 27, 2025
698664f
cleanup
smalyshev Jun 27, 2025
8e8123c
simplify
smalyshev Jun 27, 2025
4039f67
fix test
smalyshev Jun 27, 2025
de786d4
fix test
smalyshev Jun 27, 2025
19a8a75
Cleanups and more tests
smalyshev Jun 27, 2025
da1aac7
cleanups
smalyshev Jun 27, 2025
dfba8a3
[CI] Auto commit changes from spotless
Jun 27, 2025
4132884
Add check for older versions
smalyshev Jun 28, 2025
913dbe3
[CI] Auto commit changes from spotless
Jun 28, 2025
05e8184
use newer capability for this test
smalyshev Jun 30, 2025
53cf4a8
Update docs/changelog/129013.yaml
smalyshev Jun 30, 2025
cdd05a1
Cleanup
smalyshev Jun 30, 2025
935bf0a
Cleanups
smalyshev Jun 30, 2025
72f68c1
fix docs, add test
smalyshev Jun 30, 2025
2cb10a7
fix message
smalyshev Jun 30, 2025
9a8e651
fix test
smalyshev Jul 1, 2025
0c0bca0
Update x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/…
smalyshev Jul 2, 2025
4440b1b
Merge branch 'main' into remote-lookup-join
smalyshev Jul 2, 2025
201813b
Review feedback
smalyshev Jul 2, 2025
4877657
Enable only in snapshot builds for now
smalyshev Jul 2, 2025
a9a19f8
test fix
smalyshev Jul 2, 2025
240351a
Merge branch 'main' into remote-lookup-join
smalyshev Jul 2, 2025
93761ee
fix versions
smalyshev Jul 2, 2025
657d682
checkstyle
smalyshev Jul 2, 2025
30715d1
Merge branch 'main' into remote-lookup-join
smalyshev Jul 2, 2025
c16fd5e
review feedback
smalyshev Jul 3, 2025
837640a
Merge branch 'main' into remote-lookup-join
smalyshev Jul 3, 2025
efe0de2
Change some to IllegalStateException since they really only can happe…
smalyshev Jul 3, 2025
4f061ae
this wasn't a correct optimization
smalyshev Jul 3, 2025
f4afc94
Improve test
smalyshev Jul 4, 2025
dc5a3c1
Update x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/j…
smalyshev Jul 7, 2025
71892fc
Remove double check
smalyshev Jul 7, 2025
5caebed
Merge branch 'main' into remote-lookup-join
smalyshev Jul 7, 2025
d899cdc
Actually we need both checks
smalyshev Jul 7, 2025
edb02b6
Use capability
smalyshev Jul 7, 2025
dc35fa9
Add datemath test
smalyshev Jul 7, 2025
4c6f202
Use source for reprting errors
smalyshev Jul 7, 2025
d463df2
[CI] Auto commit changes from spotless
Jul 7, 2025
f58231e
fix forbidden api
smalyshev Jul 7, 2025
6597067
Merge branch 'main' into remote-lookup-join
smalyshev Jul 8, 2025
03a02fb
Move unsupported tests to Verifier
smalyshev Jul 8, 2025
9858ef5
Improve lookup indices list
smalyshev Jul 8, 2025
a1b2bbf
test fixes
smalyshev Jul 8, 2025
bd86864
Merge branch 'main' into remote-lookup-join
smalyshev Jul 8, 2025
9c90f92
use capability
smalyshev Jul 8, 2025
10b080f
moar tests
smalyshev Jul 9, 2025
44a42ce
Merge branch 'main' into remote-lookup-join
smalyshev Jul 9, 2025
d6d0f91
Merge branch 'main' into remote-lookup-join
smalyshev Jul 9, 2025
b11ab0c
Clean up tests to ensure ENABLE_LOOKUP_JOIN_ON_REMOTE is respected
smalyshev Jul 9, 2025
b170550
Add one more test
smalyshev Jul 9, 2025
44e1bf3
Merge branch 'main' into remote-lookup-join
smalyshev Jul 9, 2025
288b7c4
Merge branch 'main' into remote-lookup-join
smalyshev Jul 9, 2025
7d0fc12
Merge branch 'main' into remote-lookup-join
smalyshev Jul 10, 2025
822b139
Merge branch 'main' into remote-lookup-join
smalyshev Jul 10, 2025
8ecf11f
Merge branch 'main' into remote-lookup-join
smalyshev Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/changelog/129013.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pr: 129013
summary: "Add remote index support to LOOKUP JOIN"
area: ES|QL
type: feature
issues: [ ]
highlight:
title: Add remote index support to LOOKUP JOIN
body: |-
Queries containing LOOKUP JOIN now can be preformed on cross-cluster indices, for example:
[source,yaml]
----------------------------
FROM logs-*, remote:logs-* | LOOKUP JOIN clients on ip | SORT timestamp | LIMIT 100
----------------------------
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00);
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00);
public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00);
public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("esql/191_lookup_join_on_datastreams/data streams not supported in LOOKUP JOIN", "Added support for aliases in JOINs")
task.skipTest("esql/190_lookup_join/non-lookup index", "Error message changed")
task.skipTest("esql/192_lookup_join_on_aliases/alias-pattern-multiple", "Error message changed")
task.skipTest("esql/190_lookup_join/fails with non-lookup index", "Error message changed")
task.skipTest("esql/192_lookup_join_on_aliases/fails when alias or pattern resolves to multiple", "Error message changed")
})

tasks.named('yamlRestCompatTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.test.rest.TestFeatureService;
import org.elasticsearch.xpack.esql.CsvSpecReader;
import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase;
import org.elasticsearch.xpack.esql.CsvTestsDataLoader;
import org.elasticsearch.xpack.esql.SpecReader;
import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase;
import org.junit.AfterClass;
Expand All @@ -39,13 +40,16 @@
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser;
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
Expand Down Expand Up @@ -109,6 +113,22 @@ public MultiClusterSpecIT(
super(fileName, groupName, testName, lineNumber, convertToRemoteIndices(testCase), instructions, mode);
}

// TODO: think how to handle this better
public static final Set<String> NO_REMOTE_LOOKUP_JOIN_TESTS = Set.of(
// Lookup join after STATS is not supported in CCS yet
"StatsAndLookupIPAndMessageFromIndex",
"JoinMaskingRegex",
"StatsAndLookupIPFromIndex",
"StatsAndLookupMessageFromIndex",
"MvJoinKeyOnTheLookupIndexAfterStats",
"MvJoinKeyOnFromAfterStats",
// Lookup join after SORT is not supported in CCS yet
"NullifiedJoinKeyToPurgeTheJoin",
"SortBeforeAndAfterJoin",
"SortEvalBeforeLookup",
"SortBeforeAndAfterMultipleJoinAndMvExpand"
);

@Override
protected void shouldSkipTest(String testName) throws IOException {
boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName());
Expand All @@ -129,10 +149,20 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) {
assumeTrue("LOOKUP JOIN not yet supported in CCS", hasCapabilities(List.of(ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName())));
}
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName()));
// Tests that use capabilities not supported in CCS
assumeFalse(
"This syntax is not supported with remote LOOKUP JOIN",
NO_REMOTE_LOOKUP_JOIN_TESTS.stream().anyMatch(testName::contains)
);
// Tests that do SORT before LOOKUP JOIN - not supported in CCS
assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator"));

}

@Override
Expand Down Expand Up @@ -181,6 +211,19 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw
// These indices are used in metadata tests so we want them on remote only for consistency
public static final List<String> METADATA_INDICES = List.of("employees", "apps", "ul_logs");

// These are lookup indices, we want them on both remotes and locals
public static final Set<String> LOOKUP_INDICES = CSV_DATASET_MAP.values()
.stream()
.filter(td -> td.settingFileName() != null && td.settingFileName().equals("lookup-settings.json"))
.map(CsvTestsDataLoader.TestDataset::indexName)
.collect(Collectors.toSet());

public static final Set<String> LOOKUP_ENDPOINTS = LOOKUP_INDICES.stream().map(i -> "/" + i + "/_bulk").collect(Collectors.toSet());

public static final Set<String> ENRICH_ENDPOINTS = ENRICH_SOURCE_INDICES.stream()
.map(i -> "/" + i + "/_bulk")
.collect(Collectors.toSet());

/**
* Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests.
* - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG,
Expand All @@ -199,15 +242,17 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
return localClient.performRequest(request);
} else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
return remoteClient.performRequest(request);
} else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) {
return bulkClient.performRequest(request);
} else {
Request[] clones = cloneRequests(request, 2);
Response resp1 = remoteClient.performRequest(clones[0]);
Response resp2 = localClient.performRequest(clones[1]);
assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());
return resp2;
}
} else if (endpoint.endsWith("/_bulk")
&& ENRICH_ENDPOINTS.contains(endpoint) == false
&& LOOKUP_ENDPOINTS.contains(endpoint) == false) {
return bulkClient.performRequest(request);
} else {
Request[] clones = cloneRequests(request, 2);
Response resp1 = remoteClient.performRequest(clones[0]);
Response resp2 = localClient.performRequest(clones[1]);
assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());
return resp2;
}
});
doAnswer(invocation -> {
IOUtils.close(localClient, remoteClient);
Expand Down Expand Up @@ -251,37 +296,32 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas
String query = testCase.query;
String[] commands = query.split("\\|");
String first = commands[0].trim();
// If true, we're using *:index, otherwise we're using *:index,index
boolean onlyRemotes = canUseRemoteIndicesOnly() && randomBoolean();
if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) {
String[] parts = commands[0].split("(?i)metadata");
assert parts.length >= 1 : parts;
String fromStatement = parts[0];
String[] localIndices = fromStatement.substring("FROM ".length()).split(",");
final String remoteIndices;
if (canUseRemoteIndicesOnly() && randomBoolean()) {
remoteIndices = Arrays.stream(localIndices)
.map(index -> unquoteAndRequoteAsRemote(index.trim(), true))
.collect(Collectors.joining(","));
} else {
remoteIndices = Arrays.stream(localIndices)
.map(index -> unquoteAndRequoteAsRemote(index.trim(), false))
.collect(Collectors.joining(","));
if (Arrays.stream(localIndices).anyMatch(i -> LOOKUP_INDICES.contains(i.trim().toLowerCase(Locale.ROOT)))) {
// If the query contains lookup indices, use only remotes to avoid duplication
onlyRemotes = true;
Comment on lines +307 to +308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, doesn't this mean this suite doesn't at all test the case where a lookup happens both on remote and the local cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one doesn't because it wouldn't match the results - we'd have more rows than the local case. We have IT tests for that case. I haven't found any easy way to make these tests work in "both" case without substantially changing them.

}
final boolean onlyRemotesFinal = onlyRemotes;
final String remoteIndices = Arrays.stream(localIndices)
.map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal))
.collect(Collectors.joining(","));
var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length());
testCase.query = newFrom + query.substring(first.length());
}
if (commands[0].toLowerCase(Locale.ROOT).startsWith("ts ")) {
String[] parts = commands[0].split("\\s+");
assert parts.length >= 2 : commands[0];
String[] indices = parts[1].split(",");
if (canUseRemoteIndicesOnly() && randomBoolean()) {
parts[1] = Arrays.stream(indices)
.map(index -> unquoteAndRequoteAsRemote(index.trim(), true))
.collect(Collectors.joining(","));
} else {
parts[1] = Arrays.stream(indices)
.map(index -> unquoteAndRequoteAsRemote(index.trim(), false))
.collect(Collectors.joining(","));
}
final boolean onlyRemotesFinal = onlyRemotes;
parts[1] = Arrays.stream(indices)
.map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal))
.collect(Collectors.joining(","));
String newNewMetrics = String.join(" ", parts);
testCase.query = newNewMetrics + query.substring(first.length());
}
Expand Down Expand Up @@ -359,9 +399,7 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
// CCS does not yet support JOIN_LOOKUP_V10 and clusters falsely report they have this capability
// return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName()));
return false;
return hasCapabilities(List.of(JOIN_LOOKUP_V12.capabilityName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ record Doc(int id, String color, long data) {
List<Doc> localDocs = List.of();
final String remoteIndex = "test-remote-index";
List<Doc> remoteDocs = List.of();
final String lookupIndexLocal = "test-lookup-index-local";
final String lookupIndexRemote = "test-lookup-index-remote";
final String lookupAlias = "test-lookup-index";

@Before
public void setUpIndices() throws Exception {
Expand Down Expand Up @@ -109,6 +112,44 @@ public void setUpIndices() throws Exception {
}
}

private void setupLookupIndices() throws IOException {
RestClient localClient = client();
final String mapping = """
"properties": {
"data": { "type": "long" },
"morecolor": { "type": "keyword" }
}
""";
var lookupDocs = IntStream.range(0, between(1, 5))
.mapToObj(n -> new Doc(n, randomFrom("red", "yellow", "green"), randomIntBetween(1, 1000)))
.toList();
createIndex(
localClient,
lookupIndexLocal,
Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup").build(),
mapping,
"\"" + lookupAlias + "\":{}"
);
indexDocs(localClient, lookupIndexLocal, lookupDocs);
try (RestClient remoteClient = remoteClusterClient()) {
createIndex(
remoteClient,
lookupIndexRemote,
Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup").build(),
mapping,
"\"" + lookupAlias + "\":{}"
);
indexDocs(remoteClient, lookupIndexRemote, lookupDocs);
}
}

public void wipeLookupIndices() throws IOException {
try (RestClient remoteClient = remoteClusterClient()) {
deleteIndex(remoteClient, lookupIndexRemote);
}
deleteIndex(client(), lookupIndexLocal);
}

@After
public void wipeIndices() throws Exception {
try (RestClient remoteClient = remoteClusterClient()) {
Expand Down Expand Up @@ -416,6 +457,64 @@ public void testStats() throws IOException {
assertThat(clusterData, hasKey("took"));
}

public void testLookupJoinAliases() throws IOException {
assumeTrue(
"Local cluster does not support multiple LOOKUP JOIN aliases",
supportsLookupJoinAliases(Clusters.localClusterVersion())
);
assumeTrue(
"Remote cluster does not support multiple LOOKUP JOIN aliases",
supportsLookupJoinAliases(Clusters.remoteClusterVersion())
);
try {
setupLookupIndices();
Map<String, Object> result = run(
"FROM test-local-index,*:test-remote-index | LOOKUP JOIN test-lookup-index ON data | STATS c = COUNT(*)",
true
);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
assertResultMap(true, result, columns, values, false);
} finally {
wipeLookupIndices();
}
}

public void testLookupJoinAliasesSkipOld() throws IOException {
assumeTrue(
"Local cluster does not support multiple LOOKUP JOIN aliases",
supportsLookupJoinAliases(Clusters.localClusterVersion())
);
assumeFalse(
"Remote cluster should not support multiple LOOKUP JOIN aliases",
supportsLookupJoinAliases(Clusters.remoteClusterVersion())
);
try {
setupLookupIndices();
Map<String, Object> result = run(
"FROM test-local-index,*:test-remote-index | LOOKUP JOIN test-lookup-index ON data | STATS c = COUNT(*)",
true
);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(localDocs.size()));

MapMatcher mapMatcher = getResultMatcher(true, false, result.containsKey("documents_found")).extraOk();
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
mapMatcher = mapMatcher.entry("is_partial", true);
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
// check that the remote is skipped
@SuppressWarnings("unchecked")
Map<String, Object> clusters = (Map<String, Object>) result.get("_clusters");
@SuppressWarnings("unchecked")
Map<String, Object> details = (Map<String, Object>) clusters.get("details");
@SuppressWarnings("unchecked")
Map<String, Object> remoteCluster = (Map<String, Object>) details.get("remote_cluster");
assertThat(remoteCluster.get("status"), equalTo("skipped"));
} finally {
wipeLookupIndices();
}
}

public void testLikeIndex() throws Exception {

boolean includeCCSMetadata = includeCCSMetadata();
Expand Down Expand Up @@ -576,6 +675,10 @@ private static boolean capabilitiesEndpointAvailable() {
return Clusters.localClusterVersion().onOrAfter(Version.V_8_15_0);
}

private static boolean supportsLookupJoinAliases(Version version) {
return version.onOrAfter(Version.V_9_2_0);
}

private static boolean includeCCSMetadata() {
return ccsMetadataAvailable() && randomBoolean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.test.MapMatcher;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase;
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
import org.hamcrest.Matcher;
Expand All @@ -35,6 +36,8 @@
import static org.elasticsearch.test.ListMatcher.matchesList;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -153,6 +156,35 @@ private static boolean checkVersion(org.elasticsearch.Version version) {
|| (version.onOrAfter(Version.fromString("8.19.0")) && version.before(Version.fromString("9.0.0")));
}

public void testIndicesDontExistWithRemoteLookupJoin() throws IOException {
assumeTrue("Only works with remote LOOKUP JOIN support", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled());
// This check is for "local" cluster - which is different from test runner actually, so it could be old
assumeTrue(
"Only works with remote LOOKUP JOIN support",
clusterHasCapability(
client(),
"POST",
"_query",
List.of(),
List.of(EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName())
).orElse(false)
);

int docsTest1 = randomIntBetween(1, 5);
indexTimestampData(docsTest1, "test1", "2024-11-26", "id1");

var pattern = "FROM test1,*:test1";
ResponseException e = expectThrows(
ResponseException.class,
() -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1"))
);
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(
e.getMessage(),
allOf(containsString("verification_exception"), containsString("Unknown index [foo,remote_cluster:foo]"))
);
}

// We need a separate test since remote missing indices and local missing indices now work differently
public void testIndicesDontExistRemote() throws IOException {
// Exclude old versions
Expand Down
Loading
Loading