diff --git a/docs/changelog/129013.yaml b/docs/changelog/129013.yaml new file mode 100644 index 0000000000000..a914267300cf4 --- /dev/null +++ b/docs/changelog/129013.yaml @@ -0,0 +1,13 @@ +pr: 129013 +summary: "Add remote index support to LOOKUP JOIN" +area: ES|QL +type: feature +issues: [ ] +highlight: + title: Add remote index support to LOOKUP JOIN + body: |- + Queries containing LOOKUP JOIN now can be preformed on cross-cluster indices, for example: + [source,yaml] + ---------------------------- + FROM logs-*, remote:logs-* | LOOKUP JOIN clients on ip | SORT timestamp | LIMIT 100 + ---------------------------- diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index d54c9cf9212f9..35f5423df0ffb 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -336,6 +336,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00); public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00); public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00); + public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/build.gradle b/x-pack/plugin/build.gradle index 34b4f412e849a..8839aadb13716 100644 --- a/x-pack/plugin/build.gradle +++ b/x-pack/plugin/build.gradle @@ -136,6 +136,8 @@ tasks.named("yamlRestCompatTestTransform").configure({ task -> task.skipTest("esql/191_lookup_join_on_datastreams/data streams not supported in LOOKUP JOIN", "Added support for aliases in JOINs") task.skipTest("esql/190_lookup_join/non-lookup index", "Error message changed") task.skipTest("esql/192_lookup_join_on_aliases/alias-pattern-multiple", "Error message changed") + task.skipTest("esql/190_lookup_join/fails with non-lookup index", "Error message changed") + task.skipTest("esql/192_lookup_join_on_aliases/fails when alias or pattern resolves to multiple", "Error message changed") }) tasks.named('yamlRestCompatTest').configure { diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index f244069aab57a..705bfca2e903e 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.test.rest.TestFeatureService; import org.elasticsearch.xpack.esql.CsvSpecReader; import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase; +import org.elasticsearch.xpack.esql.CsvTestsDataLoader; import org.elasticsearch.xpack.esql.SpecReader; import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; import org.junit.AfterClass; @@ -39,13 +40,16 @@ import java.util.List; import java.util.Locale; import java.util.Optional; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; +import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; @@ -109,6 +113,22 @@ public MultiClusterSpecIT( super(fileName, groupName, testName, lineNumber, convertToRemoteIndices(testCase), instructions, mode); } + // TODO: think how to handle this better + public static final Set NO_REMOTE_LOOKUP_JOIN_TESTS = Set.of( + // Lookup join after STATS is not supported in CCS yet + "StatsAndLookupIPAndMessageFromIndex", + "JoinMaskingRegex", + "StatsAndLookupIPFromIndex", + "StatsAndLookupMessageFromIndex", + "MvJoinKeyOnTheLookupIndexAfterStats", + "MvJoinKeyOnFromAfterStats", + // Lookup join after SORT is not supported in CCS yet + "NullifiedJoinKeyToPurgeTheJoin", + "SortBeforeAndAfterJoin", + "SortEvalBeforeLookup", + "SortBeforeAndAfterMultipleJoinAndMvExpand" + ); + @Override protected void shouldSkipTest(String testName) throws IOException { boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName()); @@ -129,10 +149,20 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName())); - assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())); + if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) { + assumeTrue("LOOKUP JOIN not yet supported in CCS", hasCapabilities(List.of(ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName()))); + } // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName())); + // Tests that use capabilities not supported in CCS + assumeFalse( + "This syntax is not supported with remote LOOKUP JOIN", + NO_REMOTE_LOOKUP_JOIN_TESTS.stream().anyMatch(testName::contains) + ); + // Tests that do SORT before LOOKUP JOIN - not supported in CCS + assumeFalse("LOOKUP JOIN after SORT not yet supported in CCS", testName.contains("OnTheCoordinator")); + } @Override @@ -181,6 +211,19 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw // These indices are used in metadata tests so we want them on remote only for consistency public static final List METADATA_INDICES = List.of("employees", "apps", "ul_logs"); + // These are lookup indices, we want them on both remotes and locals + public static final Set LOOKUP_INDICES = CSV_DATASET_MAP.values() + .stream() + .filter(td -> td.settingFileName() != null && td.settingFileName().equals("lookup-settings.json")) + .map(CsvTestsDataLoader.TestDataset::indexName) + .collect(Collectors.toSet()); + + public static final Set LOOKUP_ENDPOINTS = LOOKUP_INDICES.stream().map(i -> "/" + i + "/_bulk").collect(Collectors.toSet()); + + public static final Set ENRICH_ENDPOINTS = ENRICH_SOURCE_INDICES.stream() + .map(i -> "/" + i + "/_bulk") + .collect(Collectors.toSet()); + /** * Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests. * - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG, @@ -199,15 +242,17 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return localClient.performRequest(request); } else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) { return remoteClient.performRequest(request); - } else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) { - return bulkClient.performRequest(request); - } else { - Request[] clones = cloneRequests(request, 2); - Response resp1 = remoteClient.performRequest(clones[0]); - Response resp2 = localClient.performRequest(clones[1]); - assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); - return resp2; - } + } else if (endpoint.endsWith("/_bulk") + && ENRICH_ENDPOINTS.contains(endpoint) == false + && LOOKUP_ENDPOINTS.contains(endpoint) == false) { + return bulkClient.performRequest(request); + } else { + Request[] clones = cloneRequests(request, 2); + Response resp1 = remoteClient.performRequest(clones[0]); + Response resp2 = localClient.performRequest(clones[1]); + assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); + return resp2; + } }); doAnswer(invocation -> { IOUtils.close(localClient, remoteClient); @@ -251,21 +296,21 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas String query = testCase.query; String[] commands = query.split("\\|"); String first = commands[0].trim(); + // If true, we're using *:index, otherwise we're using *:index,index + boolean onlyRemotes = canUseRemoteIndicesOnly() && randomBoolean(); if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) { String[] parts = commands[0].split("(?i)metadata"); assert parts.length >= 1 : parts; String fromStatement = parts[0]; String[] localIndices = fromStatement.substring("FROM ".length()).split(","); - final String remoteIndices; - if (canUseRemoteIndicesOnly() && randomBoolean()) { - remoteIndices = Arrays.stream(localIndices) - .map(index -> unquoteAndRequoteAsRemote(index.trim(), true)) - .collect(Collectors.joining(",")); - } else { - remoteIndices = Arrays.stream(localIndices) - .map(index -> unquoteAndRequoteAsRemote(index.trim(), false)) - .collect(Collectors.joining(",")); + if (Arrays.stream(localIndices).anyMatch(i -> LOOKUP_INDICES.contains(i.trim().toLowerCase(Locale.ROOT)))) { + // If the query contains lookup indices, use only remotes to avoid duplication + onlyRemotes = true; } + final boolean onlyRemotesFinal = onlyRemotes; + final String remoteIndices = Arrays.stream(localIndices) + .map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal)) + .collect(Collectors.joining(",")); var newFrom = "FROM " + remoteIndices + " " + commands[0].substring(fromStatement.length()); testCase.query = newFrom + query.substring(first.length()); } @@ -273,15 +318,10 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas String[] parts = commands[0].split("\\s+"); assert parts.length >= 2 : commands[0]; String[] indices = parts[1].split(","); - if (canUseRemoteIndicesOnly() && randomBoolean()) { - parts[1] = Arrays.stream(indices) - .map(index -> unquoteAndRequoteAsRemote(index.trim(), true)) - .collect(Collectors.joining(",")); - } else { - parts[1] = Arrays.stream(indices) - .map(index -> unquoteAndRequoteAsRemote(index.trim(), false)) - .collect(Collectors.joining(",")); - } + final boolean onlyRemotesFinal = onlyRemotes; + parts[1] = Arrays.stream(indices) + .map(index -> unquoteAndRequoteAsRemote(index.trim(), onlyRemotesFinal)) + .collect(Collectors.joining(",")); String newNewMetrics = String.join(" ", parts); testCase.query = newNewMetrics + query.substring(first.length()); } @@ -359,9 +399,7 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - // CCS does not yet support JOIN_LOOKUP_V10 and clusters falsely report they have this capability - // return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName())); - return false; + return hasCapabilities(List.of(JOIN_LOOKUP_V12.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 7dcf1c47567e0..7fa6d789bc8fc 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -72,6 +72,9 @@ record Doc(int id, String color, long data) { List localDocs = List.of(); final String remoteIndex = "test-remote-index"; List remoteDocs = List.of(); + final String lookupIndexLocal = "test-lookup-index-local"; + final String lookupIndexRemote = "test-lookup-index-remote"; + final String lookupAlias = "test-lookup-index"; @Before public void setUpIndices() throws Exception { @@ -109,6 +112,44 @@ public void setUpIndices() throws Exception { } } + private void setupLookupIndices() throws IOException { + RestClient localClient = client(); + final String mapping = """ + "properties": { + "data": { "type": "long" }, + "morecolor": { "type": "keyword" } + } + """; + var lookupDocs = IntStream.range(0, between(1, 5)) + .mapToObj(n -> new Doc(n, randomFrom("red", "yellow", "green"), randomIntBetween(1, 1000))) + .toList(); + createIndex( + localClient, + lookupIndexLocal, + Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup").build(), + mapping, + "\"" + lookupAlias + "\":{}" + ); + indexDocs(localClient, lookupIndexLocal, lookupDocs); + try (RestClient remoteClient = remoteClusterClient()) { + createIndex( + remoteClient, + lookupIndexRemote, + Settings.builder().put("index.number_of_shards", 1).put("index.mode", "lookup").build(), + mapping, + "\"" + lookupAlias + "\":{}" + ); + indexDocs(remoteClient, lookupIndexRemote, lookupDocs); + } + } + + public void wipeLookupIndices() throws IOException { + try (RestClient remoteClient = remoteClusterClient()) { + deleteIndex(remoteClient, lookupIndexRemote); + } + deleteIndex(client(), lookupIndexLocal); + } + @After public void wipeIndices() throws Exception { try (RestClient remoteClient = remoteClusterClient()) { @@ -416,6 +457,64 @@ public void testStats() throws IOException { assertThat(clusterData, hasKey("took")); } + public void testLookupJoinAliases() throws IOException { + assumeTrue( + "Local cluster does not support multiple LOOKUP JOIN aliases", + supportsLookupJoinAliases(Clusters.localClusterVersion()) + ); + assumeTrue( + "Remote cluster does not support multiple LOOKUP JOIN aliases", + supportsLookupJoinAliases(Clusters.remoteClusterVersion()) + ); + try { + setupLookupIndices(); + Map result = run( + "FROM test-local-index,*:test-remote-index | LOOKUP JOIN test-lookup-index ON data | STATS c = COUNT(*)", + true + ); + var columns = List.of(Map.of("name", "c", "type", "long")); + var values = List.of(List.of(localDocs.size() + remoteDocs.size())); + assertResultMap(true, result, columns, values, false); + } finally { + wipeLookupIndices(); + } + } + + public void testLookupJoinAliasesSkipOld() throws IOException { + assumeTrue( + "Local cluster does not support multiple LOOKUP JOIN aliases", + supportsLookupJoinAliases(Clusters.localClusterVersion()) + ); + assumeFalse( + "Remote cluster should not support multiple LOOKUP JOIN aliases", + supportsLookupJoinAliases(Clusters.remoteClusterVersion()) + ); + try { + setupLookupIndices(); + Map result = run( + "FROM test-local-index,*:test-remote-index | LOOKUP JOIN test-lookup-index ON data | STATS c = COUNT(*)", + true + ); + var columns = List.of(Map.of("name", "c", "type", "long")); + var values = List.of(List.of(localDocs.size())); + + MapMatcher mapMatcher = getResultMatcher(true, false, result.containsKey("documents_found")).extraOk(); + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + mapMatcher = mapMatcher.entry("is_partial", true); + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values)); + // check that the remote is skipped + @SuppressWarnings("unchecked") + Map clusters = (Map) result.get("_clusters"); + @SuppressWarnings("unchecked") + Map details = (Map) clusters.get("details"); + @SuppressWarnings("unchecked") + Map remoteCluster = (Map) details.get("remote_cluster"); + assertThat(remoteCluster.get("status"), equalTo("skipped")); + } finally { + wipeLookupIndices(); + } + } + public void testLikeIndex() throws Exception { boolean includeCCSMetadata = includeCCSMetadata(); @@ -576,6 +675,10 @@ private static boolean capabilitiesEndpointAvailable() { return Clusters.localClusterVersion().onOrAfter(Version.V_8_15_0); } + private static boolean supportsLookupJoinAliases(Version version) { + return version.onOrAfter(Version.V_9_2_0); + } + private static boolean includeCCSMetadata() { return ccsMetadataAvailable() && randomBoolean(); } diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java index c3e29602a8b8c..976e18bb40232 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.hamcrest.Matcher; @@ -35,6 +36,8 @@ import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -153,6 +156,35 @@ private static boolean checkVersion(org.elasticsearch.Version version) { || (version.onOrAfter(Version.fromString("8.19.0")) && version.before(Version.fromString("9.0.0"))); } + public void testIndicesDontExistWithRemoteLookupJoin() throws IOException { + assumeTrue("Only works with remote LOOKUP JOIN support", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + // This check is for "local" cluster - which is different from test runner actually, so it could be old + assumeTrue( + "Only works with remote LOOKUP JOIN support", + clusterHasCapability( + client(), + "POST", + "_query", + List.of(), + List.of(EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.capabilityName()) + ).orElse(false) + ); + + int docsTest1 = randomIntBetween(1, 5); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + + var pattern = "FROM test1,*:test1"; + ResponseException e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat( + e.getMessage(), + allOf(containsString("verification_exception"), containsString("Unknown index [foo,remote_cluster:foo]")) + ); + } + // We need a separate test since remote missing indices and local missing indices now work differently public void testIndicesDontExistRemote() throws IOException { // Exclude old versions diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index 1ba4365ea3e92..3354fb1413e06 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -13,7 +13,6 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.esql.AssertWarnings; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; @@ -211,23 +210,16 @@ public void testIndicesDontExist() throws IOException { e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo, test1"))); assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); assertThat(e.getMessage(), containsString("index_not_found_exception")); - assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]"))); + assertThat(e.getMessage(), containsString("no such index [foo]")); + // Don't test remote patterns here, we'll test them in the multi-cluster tests if (EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()) { - var pattern = from("test1"); e = expectThrows( ResponseException.class, - () -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1")) + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test1 | LOOKUP JOIN foo ON id1")) ); assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); - assertThat( - e.getMessage(), - // currently we don't support remote clusters in LOOKUP JOIN - // this check happens before resolving actual indices and results in a different error message - RemoteClusterAware.isRemoteIndexName(pattern) - ? allOf(containsString("parsing_exception"), containsString("remote clusters are not supported")) - : allOf(containsString("verification_exception"), containsString("Unknown index [foo]")) - ); + assertThat(e.getMessage(), allOf(containsString("verification_exception"), containsString("Unknown index [foo]"))); } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index e82c7be4cede2..bdf0413a03d02 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1689,7 +1689,7 @@ salary_change.long:double|foo:long joinMaskingEval required_capability: join_lookup_v12 required_capability: fix_join_masking_eval -from languag*, -languages_mixed_numerics +from languages,languages_lookup,languages_lookup_non_unique_key,languages_nested_fields | eval type = null | rename language_name as message | lookup join message_types_lookup on message diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index 992572fc3220d..a724483ecee51 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -7,13 +7,17 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; @@ -255,7 +259,7 @@ protected void populateRuntimeIndex(String clusterAlias, String langName, String bulk.get(); } - protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException { + protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { Client remoteClient = client(clusterAlias); assertAcked( remoteClient.admin() @@ -270,6 +274,36 @@ protected void populateRemoteIndices(String clusterAlias, String indexName, int remoteClient.admin().indices().prepareRefresh(indexName).get(); } + protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs, String keyType) { + Client client = client(clusterAlias); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + String field_tag = Strings.isEmpty(clusterAlias) ? "local_tag" : "remote_tag"; + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.mode", "lookup")) + .setMapping( + "lookup_key", + "type=" + keyType, + "lookup_name", + "type=keyword", + "lookup_tag", + "type=keyword", + field_tag, + "type=keyword" + ) + ); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag, field_tag, i).get(); + } + client.admin().indices().prepareRefresh(indexName).get(); + } + + protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs) { + populateLookupIndex(clusterAlias, indexName, numDocs, "long"); + } + protected void setSkipUnavailable(String clusterAlias, boolean skip) { client(LOCAL_CLUSTER).admin() .cluster() @@ -314,4 +348,14 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse static List getDriverTasks(Client client) { return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); } + + protected static Exception randomFailure() { + return randomFrom( + new IllegalStateException("driver was closed already"), + new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT), + new IOException("broken disk"), + new ResourceNotFoundException("exchange sink was not found"), + new EsRejectedExecutionException("node is shutting down") + ); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java new file mode 100644 index 0000000000000..2643926bf1637 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java @@ -0,0 +1,241 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.VerificationException; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; + +public class CrossClusterLookupJoinFailuresIT extends AbstractCrossClusterTestCase { + protected boolean reuseClusters() { + return false; + } + + @Before + public void checkEnabled() { + assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + } + + public void testLookupFail() throws IOException { + setupClusters(3); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 25); + populateLookupIndex(REMOTE_CLUSTER_2, "values_lookup", 25); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + Exception simulatedFailure = randomFailure(); + // fail when trying to do the lookup on remote cluster + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + EsqlResolveFieldsAction.RESOLVE_REMOTE_TYPE.name(), + (handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public void sendResponse(TransportResponse response) { + sendResponse(simulatedFailure); + } + + @Override + public void sendResponse(Exception exception) { + channel.sendResponse(exception); + } + }, task) + ); + } + + try { + // FIXME: this should catch the error but fails instead + /* + try ( + EsqlQueryResponse resp = runQuery( + "FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); + } */ + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + // FIXME: this produces a wrong message currently + // assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); + assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + var remoteCluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + // FIXME: this produces a wrong message currently + // assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); + assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + // now fail + setSkipUnavailable(REMOTE_CLUSTER_1, false); + Exception ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + + ex = expectThrows( + Exception.class, + () -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage(); + assertThat(message, containsString(simulatedFailure.getMessage())); + } finally { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + + public void testLookupDisconnect() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try { + // close the remote cluster so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + // this is a bit weird but that's how it works + assertThat(columns, hasSize(1)); + assertThat(columns, hasItems("")); + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat(failure.reason(), containsString("unable to connect to remote cluster")); + } + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat( + failure.reason(), + containsString("Remote cluster [cluster-a] (with setting skip_unavailable=true) is not available") + ); + } + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + Exception ex = expectThrows( + ElasticsearchException.class, + () -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(ex)); + + ex = expectThrows( + ElasticsearchException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(ex)); + } finally { + clearSkipUnavailable(2); + } + } + +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java new file mode 100644 index 0000000000000..03a7bf4546d05 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java @@ -0,0 +1,531 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.junit.Before; + +import java.io.IOException; +import java.time.Duration; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; + +// @TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning") +public class CrossClusterLookupJoinIT extends AbstractCrossClusterTestCase { + + @Before + public void checkEnabled() { + assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + } + + public void testLookupJoinAcrossClusters() throws IOException { + setupClustersAndLookups(); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + int vIndex = columns.indexOf("v"); + int lookupNameIndex = columns.indexOf("lookup_name"); + int tagIndex = columns.indexOf("tag"); + int lookupTagIndex = columns.indexOf("lookup_tag"); + + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + for (var row : values) { + assertThat(row, hasSize(9)); + Long v = (Long) row.get(vIndex); + assertThat(v, greaterThanOrEqualTo(0L)); + if (v < 25) { + assertThat((String) row.get(lookupNameIndex), equalTo("lookup_" + v)); + String tag = (String) row.get(tagIndex); + if (tag.equals("local")) { + assertThat(row.get(lookupTagIndex), equalTo("local")); + } else { + assertThat(row.get(lookupTagIndex), equalTo(REMOTE_CLUSTER_1)); + } + } else { + assertNull(row.get(lookupNameIndex)); + assertNull(row.get(lookupTagIndex)); + } + } + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + } + + populateLookupIndex(LOCAL_CLUSTER, "values_lookup2", 5); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup2", 5); + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key " + + "| LOOKUP JOIN values_lookup2 ON lookup_key", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + } + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key " + + "| STATS c = count(*) BY lookup_name | SORT c", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + // 0-9 + null + 16 + assertThat(values, hasSize(12)); + for (var row : values) { + if (row.get(1) == null) { + assertThat((Long) row.get(0), equalTo(5L)); // null + } else { + assertThat((String) row.get(1), containsString("lookup_")); + if (row.get(1).equals("lookup_0") + || row.get(1).equals("lookup_1") + || row.get(1).equals("lookup_4") + || row.get(1).equals("lookup_9")) { + // squares + assertThat((Long) row.get(0), equalTo(2L)); + } else { + assertThat((Long) row.get(0), equalTo(1L)); + } + } + } + } + } + + public void testLookupJoinWithAliases() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup_local", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup_remote", 10); + + setupAlias(LOCAL_CLUSTER, "values_lookup_local", "values_lookup"); + setupAlias(REMOTE_CLUSTER_1, "values_lookup_remote", "values_lookup"); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + } + } + + public void testLookupJoinWithDatemath() throws IOException { + setupClusters(2); + + ZonedDateTime nowUtc = ZonedDateTime.now(ZoneOffset.UTC); + ZonedDateTime nextMidnight = nowUtc.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0); + // If we're too close to midnight, we could create index with one day and query with another, and it'd fail. + assumeTrue("Skip if too close to midnight", Duration.between(nowUtc, nextMidnight).toMinutes() >= 5); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT); + String lookupIndexName = "values_lookup_" + nowUtc.format(formatter); + + populateLookupIndex(LOCAL_CLUSTER, lookupIndexName, 10); + populateLookupIndex(REMOTE_CLUSTER_1, lookupIndexName, 10); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN \"\" ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + } + } + + public void testLookupJoinMissingRemoteIndex() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + // Without local + // FIXME: this is inconsistent due to how field-caps works - if there's no index at all, it fails, but if there's one but not + // another, it succeeds. Ideally, this would be empty result with remote1 skipped, but field-caps fails. + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("Unknown index [cluster-a:values_lookup]")); + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + // then missing index is an error + ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + public void testLookupJoinMissingRemoteIndexTwoRemotes() throws IOException { + setupClusters(3); + populateLookupIndex(REMOTE_CLUSTER_2, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, false); + + // FIXME: inconsistent with the previous test, remote1:values_lookup still missing, but now it succeeds with remote1 skipped + try ( + EsqlQueryResponse resp = runQuery( + "FROM *:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(2)); + + var remoteCluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster1.getFailures(), not(empty())); + var failure = remoteCluster1.getFailures().get(0); + assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + var remoteCluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + } + + public void testLookupJoinMissingLocalIndex() throws IOException { + setupClusters(2); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10); + + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in local cluster")); + + // Without local in the query it's ok + try ( + EsqlQueryResponse resp = runQuery( + "FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag", "remote_tag")); + assertThat(columns, not(hasItems("local_tag"))); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(1)); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + } + + public void testLookupJoinMissingKey() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try ( + // Using local_tag as key which is not present in remote index + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(2)); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // FIXME: verify whether we need to skip or succeed here + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + var remoteTagIndex = columns.indexOf("remote_tag"); + var lookupNameIndex = columns.indexOf("lookup_name"); + var tagIndex = columns.indexOf("tag"); + assertThat(remoteTagIndex, greaterThanOrEqualTo(0)); + for (var row : values) { + // remote tag column should be null + assertNull(row.get(remoteTagIndex)); + if (row.get(tagIndex).equals("local")) { + // local value should be present + assertThat((String) row.get(lookupNameIndex), containsString("lookup_")); + } else { + // remote value should be null + assertNull(row.get(lookupNameIndex)); + } + } + } + + try ( + // Using remote_tag as key which is not present in local index + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL remote_tag = to_string(v) | LOOKUP JOIN values_lookup ON remote_tag", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(2)); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + var localTagIndex = columns.indexOf("local_tag"); + var remoteTagIndex = columns.indexOf("remote_tag"); + var lookupNameIndex = columns.indexOf("lookup_name"); + var tagIndex = columns.indexOf("tag"); + assertThat(localTagIndex, greaterThanOrEqualTo(0)); + for (var row : values) { + // remote tag column should be null + assertNull(row.get(localTagIndex)); + if (row.get(tagIndex).equals("remote") && List.of("0", "1", "4", "9").contains((String) row.get(remoteTagIndex))) { + // remote value should be present + assertThat((String) row.get(lookupNameIndex), containsString("lookup_")); + } else { + // local value should be null + assertNull(row.get(lookupNameIndex)); + } + } + } + + // TODO: verify whether this should be an error or not when the key field is missing + Exception ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM c*:logs-* | LOOKUP JOIN values_lookup ON v", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("Unknown column [v] in right side of join")); + + ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("Unknown column [local_tag] in right side of join")); + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + try ( + // Using local_tag as key which is not present in remote index + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(2)); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // FIXME: verify whether we need to succeed or fail here + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + } + + public void testLookupJoinIndexMode() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateIndex(REMOTE_CLUSTER_1, "values_lookup", randomIntBetween(1, 3), 10); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat( + failure.reason(), + containsString( + "Lookup Join requires a single lookup mode index; " + + "[values_lookup] resolves to [cluster-a:values_lookup] in [standard] mode" + ) + ); + } + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + // then missing index is an error + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat( + ex.getMessage(), + containsString( + "Lookup Join requires a single lookup mode index; [values_lookup] resolves to [cluster-a:values_lookup] in [standard] mode" + ) + ); + } + + public void testLookupJoinFieldTypes() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10, "keyword"); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat( + ex.getMessage(), + containsString( + "Cannot use field [lookup_key] due to ambiguities being mapped as [2] incompatible types:" + + " [keyword] in [cluster-a:values_lookup], [long] in [values_lookup]" + ) + ); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_name = v::keyword | LOOKUP JOIN values_lookup ON lookup_name", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasSize(9)); + var lookupKeyIndex = columns.indexOf("lookup_key"); + assertThat(lookupKeyIndex, greaterThanOrEqualTo(0)); + var keyColumn = resp.columns().get(lookupKeyIndex); + assertThat(keyColumn.type(), equalTo(DataType.UNSUPPORTED)); + assertThat(keyColumn.originalTypes(), hasItems("keyword", "long")); + List> values = getValuesList(resp); + for (var row : values) { + assertNull(row.get(lookupKeyIndex)); + } + } + } + + protected Map setupClustersAndLookups() throws IOException { + var setupData = setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 25); + return setupData; + } + + public void setupHostsEnrich() { + // the hosts policy are identical on every node + Map allHosts = Map.of("192.168.1.2", "Windows"); + Client client = client(LOCAL_CLUSTER); + client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get(); + for (Map.Entry h : allHosts.entrySet()) { + client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); + } + client.admin().indices().prepareRefresh("hosts").get(); + EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os")); + client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy)) + .actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) + .actionGet(); + assertAcked(client.admin().indices().prepareDelete("hosts")); + } + + private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { + assertNotNull(executionInfo); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + List clusters = executionInfo.clusterAliases().stream().map(executionInfo::getCluster).toList(); + + for (EsqlExecutionInfo.Cluster cluster : clusters) { + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + } + } + + protected void setupAlias(String clusterAlias, String indexName, String aliasName) { + Client client = client(clusterAlias); + IndicesAliasesRequestBuilder indicesAliasesRequestBuilder = client.admin() + .indices() + .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName)); + assertAcked(client.admin().indices().aliases(indicesAliasesRequestBuilder.request())); + } + +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 62d5904b58b86..2c6b92655ba75 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -8,13 +8,9 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.test.FailingFieldPlugin; @@ -346,16 +342,6 @@ public void testFailSearchShardsOnLocalCluster() throws Exception { } } - private static Exception randomFailure() { - return randomFrom( - new IllegalStateException("driver was closed already"), - new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT), - new IOException("broken disk"), - new ResourceNotFoundException("exchange sink was not found"), - new EsRejectedExecutionException("node is shutting down") - ); - } - private Set populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException { Client client = client(clusterAlias); XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 979683d28324e..7ee953b9d1d9a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1190,6 +1190,11 @@ public enum Cap { */ RLIKE_WITH_EMPTY_LANGUAGE_PATTERN, + /** + * Enable support for cross-cluster lookup joins. + */ + ENABLE_LOOKUP_JOIN_ON_REMOTE(Build.current().isSnapshot()), + /** * MATCH PHRASE function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 1b48ffd22b491..e4b8949af5bdb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -743,7 +743,7 @@ private Join resolveLookupJoin(LookupJoin join) { List rightKeys = resolveUsingColumns(cols, join.right().output(), "right"); config = new JoinConfig(coreJoin, leftKeys, leftKeys, rightKeys); - join = new LookupJoin(join.source(), join.left(), join.right(), config); + join = new LookupJoin(join.source(), join.left(), join.right(), config, join.isRemote()); } else if (type != JoinTypes.LEFT) { // everything else is unsupported for now // LEFT can only happen by being mapped from a USING above. So we need to exclude this as well because this rule can be run diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 12ad0a16ca12e..74309fa0bdb85 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -639,8 +639,18 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { } return p -> { - checkForRemoteClusters(p, source(target), "LOOKUP JOIN"); - return new LookupJoin(source, p, right, joinFields); + boolean hasRemotes = p.anyMatch(node -> { + if (node instanceof UnresolvedRelation r) { + return Arrays.stream(Strings.splitStringByCommaToArray(r.indexPattern().indexPattern())) + .anyMatch(RemoteClusterAware::isRemoteIndexName); + } else { + return false; + } + }); + if (hasRemotes && EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled() == false) { + throw new ParsingException(source, "remote clusters are not supported with LOOKUP JOIN"); + } + return new LookupJoin(source, p, right, joinFields, hasRemotes); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java index 78503785ea7f2..794957dc473eb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java @@ -45,7 +45,7 @@ import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType; -public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic { +public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "Aggregate", diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java index a59433e94f965..cbc4ee7da5be9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java @@ -18,7 +18,7 @@ import java.io.IOException; import java.util.Objects; -public class Limit extends UnaryPlan implements TelemetryAware { +public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new); private final Expression limit; @@ -57,17 +57,15 @@ private Limit(StreamInput in) throws IOException { } /** - * Omits serializing {@link Limit#duplicated} because when sent to a data node, this should always be {@code false}. - * That's because if it's true, this means a copy of this limit was pushed down below an MvExpand or Join, and thus there's - * another pipeline breaker further upstream - we're already on the coordinator node. + * Omits serializing {@link Limit#duplicated} because this is only required to avoid duplicating a limit past + * {@link org.elasticsearch.xpack.esql.plan.logical.join.Join} or {@link MvExpand} in an infinite loop, see + * {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits}. */ @Override public void writeTo(StreamOutput out) throws IOException { Source.EMPTY.writeTo(out); out.writeNamedWriteable(limit()); out.writeNamedWriteable(child()); - // Let's make sure we notice during tests if we ever serialize a duplicated Limit. - assert duplicated == false; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java index ddb07e0490db3..bed01060c2110 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java @@ -31,7 +31,8 @@ public class OrderBy extends UnaryPlan PostAnalysisVerificationAware, PostOptimizationVerificationAware, TelemetryAware, - SortAgnostic { + SortAgnostic, + PipelineBreaker { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "OrderBy", OrderBy::new); private final List order; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/PipelineBreaker.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/PipelineBreaker.java new file mode 100644 index 0000000000000..362ce2bad7227 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/PipelineBreaker.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; + +/** + * A {@link LogicalPlan} that cannot be run only on the data nodes, resp. requires to be at least partially run on the coordinator. + * When mapping to a physical plan, the first pipeline breaker will give rise to a {@link FragmentExec} + * that contains the {@link LogicalPlan} that data nodes will execute. + */ +public interface PipelineBreaker {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java index a9a5dbddc544f..063b209a13ca4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Objects; -public class TopN extends UnaryPlan { +public class TopN extends UnaryPlan implements PipelineBreaker { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "TopN", TopN::new); private final List order; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 59fdc902273b9..2c7b1a399b3f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -83,10 +83,17 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S private final JoinConfig config; private List lazyOutput; + // Does this join involve remote indices? This is relevant only on the coordinating node, thus transient. + private transient boolean isRemote = false; public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) { + this(source, left, right, config, false); + } + + public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config, boolean isRemote) { super(source, left, right); this.config = config; + this.isRemote = isRemote; } public Join( @@ -234,17 +241,17 @@ public boolean resolved() { } public Join withConfig(JoinConfig config) { - return new Join(source(), left(), right(), config); + return new Join(source(), left(), right(), config, isRemote); } @Override public Join replaceChildren(LogicalPlan left, LogicalPlan right) { - return new Join(source(), left, right, config); + return new Join(source(), left, right, config, isRemote); } @Override public int hashCode() { - return Objects.hash(config, left(), right()); + return Objects.hash(config, left(), right(), isRemote); } @Override @@ -257,7 +264,10 @@ public boolean equals(Object obj) { } Join other = (Join) obj; - return config.equals(other.config) && Objects.equals(left(), other.left()) && Objects.equals(right(), other.right()); + return config.equals(other.config) + && Objects.equals(left(), other.left()) + && Objects.equals(right(), other.right()) + && isRemote == other.isRemote; } @Override @@ -295,4 +305,8 @@ private static boolean comparableTypes(Attribute left, Attribute right) { } return leftType.noText() == rightType.noText(); } + + public boolean isRemote() { + return isRemote; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java index 8672da7bce786..16ff5fba7bbd8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java @@ -7,18 +7,20 @@ package org.elasticsearch.xpack.esql.plan.logical.join; -import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware; import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType; +import java.util.LinkedList; import java.util.List; import static java.util.Collections.emptyList; @@ -30,8 +32,12 @@ */ public class LookupJoin extends Join implements SurrogateLogicalPlan, PostAnalysisVerificationAware, TelemetryAware { + public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List joinFields, boolean isRemote) { + this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), isRemote); + } + public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List joinFields) { - this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList()); + this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList(), false); } public LookupJoin( @@ -41,13 +47,18 @@ public LookupJoin( JoinType type, List joinFields, List leftFields, - List rightFields + List rightFields, + boolean isRemote ) { - this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields)); + this(source, left, right, new JoinConfig(type, joinFields, leftFields, rightFields), isRemote); } public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig) { - super(source, left, right, joinConfig); + this(source, left, right, joinConfig, false); + } + + public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig, boolean isRemote) { + super(source, left, right, joinConfig, isRemote); } /** @@ -56,12 +67,12 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig @Override public LogicalPlan surrogate() { // TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now - return new Join(source(), left(), right(), config()); + return new Join(source(), left(), right(), config(), isRemote()); } @Override public Join replaceChildren(LogicalPlan left, LogicalPlan right) { - return new LookupJoin(source(), left, right, config()); + return new LookupJoin(source(), left, right, config(), isRemote()); } @Override @@ -74,7 +85,8 @@ protected NodeInfo info() { config().type(), config().matchFields(), config().leftFields(), - config().rightFields() + config().rightFields(), + isRemote() ); } @@ -86,31 +98,27 @@ public String telemetryLabel() { @Override public void postAnalysisVerification(Failures failures) { super.postAnalysisVerification(failures); - right().forEachDown(EsRelation.class, esr -> { - var indexNameWithModes = esr.indexNameWithModes(); - if (indexNameWithModes.size() != 1) { - failures.add( - fail( - esr, - "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] indices", - esr.indexPattern(), - indexNameWithModes.size() - ) - ); - return; + if (isRemote()) { + checkRemoteJoin(failures); + } + } + + private void checkRemoteJoin(Failures failures) { + List fails = new LinkedList<>(); + + this.forEachUp(UnaryPlan.class, u -> { + if (u instanceof PipelineBreaker) { + fails.add(u.source()); } - var indexAndMode = indexNameWithModes.entrySet().iterator().next(); - if (indexAndMode.getValue() != IndexMode.LOOKUP) { - failures.add( - fail( - esr, - "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] in [{}] mode", - esr.indexPattern(), - indexAndMode.getKey(), - indexAndMode.getValue() - ) - ); + if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { + fails.add(u.source()); } }); + + fails.forEach( + f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after [" + f.text() + "]" + f.source())) + ); + } + } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index a92d2f439a0ea..ad6cb42f7f835 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -61,6 +61,7 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; import org.elasticsearch.xpack.esql.core.expression.Alias; @@ -123,6 +124,7 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.score.ScoreMapper; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import java.util.ArrayList; import java.util.List; @@ -733,15 +735,37 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (localSourceExec.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } - Map indicesWithModes = localSourceExec.indexNameWithModes(); - if (indicesWithModes.size() != 1) { - throw new IllegalArgumentException("can't plan [" + join + "], found more than 1 index"); + + // After enabling remote joins, we can have one of the two situations here: + // 1. We've just got one entry - this should be the one relevant to the join, and it should be for this cluster + // 2. We have got multiple entries - this means each cluster has its own one, and we should extract one relevant for this cluster + Map.Entry entry; + if (localSourceExec.indexNameWithModes().size() == 1) { + entry = localSourceExec.indexNameWithModes().entrySet().iterator().next(); + } else { + var maybeEntry = localSourceExec.indexNameWithModes() + .entrySet() + .stream() + .filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias)) + .findFirst(); + entry = maybeEntry.orElseThrow( + () -> new IllegalStateException( + "can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias) + ) + ); } - var entry = indicesWithModes.entrySet().iterator().next(); + if (entry.getValue() != IndexMode.LOOKUP) { - throw new IllegalArgumentException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]"); + throw new IllegalStateException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]"); + } + String[] indexSplit = RemoteClusterAware.splitIndexName(entry.getKey()); + // No prefix is ok, prefix with this cluster is ok, something else is not + if (indexSplit[0] != null && clusterAlias.equals(indexSplit[0]) == false) { + throw new IllegalStateException( + "can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias) + ); } - String indexName = entry.getKey(); + String indexName = indexSplit[1]; if (join.leftFields().size() != join.rightFields().size()) { throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count"); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index da0ea1c0adff6..e9d8c93511106 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.esql.plan.QueryPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; @@ -51,7 +52,6 @@ import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; -import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.session.Configuration; import org.elasticsearch.xpack.esql.stats.SearchContextStats; @@ -120,7 +120,7 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) { } final FragmentExec fragment = (FragmentExec) fragments.getFirst(); - final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker); + final var pipelineBreakers = fragment.fragment().collectFirstChildren(p -> p instanceof PipelineBreaker); if (pipelineBreakers.isEmpty()) { return null; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 4bdb90af10316..4d1d65d63932d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -20,7 +20,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.plan.logical.OrderBy; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -136,7 +136,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { return MapperUtils.mapUnary(unary, mappedChild); } // in case of a fragment, push to it any current streaming operator - if (isPipelineBreaker(unary) == false) { + if (unary instanceof PipelineBreaker == false) { return new FragmentExec(unary); } } @@ -206,6 +206,14 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]"); } + if (join.isRemote()) { + // This is generally wrong in case of pipeline breakers upstream from the join, but we validate against these. + // The only potential pipeline breakers upstream should be limits duplicated past the join from PushdownAndCombineLimits, + // but they are okay to perform on the data nodes because they only serve to reduce the number of rows processed and + // don't affect correctness due to another limit being downstream. + return new FragmentExec(bp); + } + PhysicalPlan left = map(bp.left()); // only broadcast joins supported for now - hence push down as a streaming operator @@ -240,10 +248,6 @@ private PhysicalPlan mapFork(Fork fork) { return new MergeExec(fork.source(), fork.children().stream().map(child -> map(child)).toList(), fork.output()); } - public static boolean isPipelineBreaker(LogicalPlan p) { - return p instanceof Aggregate || p instanceof TopN || p instanceof Limit || p instanceof OrderBy; - } - private PhysicalPlan addExchangeForFragment(LogicalPlan logical, PhysicalPlan child) { // in case of fragment, preserve the streaming operator (order-by, limit or topN) for local replanning // no need to do it for an aggregate since it gets split diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index d507b8275178d..901057f4db61c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -384,4 +384,12 @@ public static boolean canAllowPartial(Exception e) { } return true; } + + public static String inClusterName(String clusterAlias) { + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + return "in local cluster"; + } else { + return "in remote cluster [" + clusterAlias + "]"; + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b3bdf7de32464..40a859e3f5b58 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; @@ -28,6 +29,8 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; @@ -102,6 +105,7 @@ import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -109,6 +113,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; @@ -144,6 +149,7 @@ public interface PlanRunner { private final IndicesExpressionGrouper indicesExpressionGrouper; private Set configuredClusters; private final InferenceRunner inferenceRunner; + private final RemoteClusterService remoteClusterService; private boolean explainMode; private String parsedPlanString; @@ -177,6 +183,7 @@ public EsqlSession( this.indicesExpressionGrouper = indicesExpressionGrouper; this.inferenceRunner = services.inferenceRunner(); this.preMapper = new PreMapper(services); + this.remoteClusterService = services.transportService().getRemoteClusterService(); } public String sessionId() { @@ -368,7 +375,7 @@ public void analyzedPlan( .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); // first resolve the lookup indices, then the main indices for (var index : preAnalysis.lookupIndices) { - listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(index, preAnalysisResult, l); }); + listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l)); } listener.andThen((l, result) -> { // resolve the main indices @@ -407,16 +414,196 @@ public void analyzedPlan( }).addListener(logicalPlanListener); } - private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, ActionListener listener) { - Set fieldNames = result.wildcardJoinIndices().contains(table.indexPattern()) ? IndexResolver.ALL_FIELDS : result.fieldNames; + private void preAnalyzeLookupIndex( + IndexPattern lookupIndexPattern, + PreAnalysisResult result, + EsqlExecutionInfo executionInfo, + ActionListener listener + ) { + String localPattern = lookupIndexPattern.indexPattern(); + assert RemoteClusterAware.isRemoteIndexName(localPattern) == false + : "Lookup index name should not include remote, but got: " + localPattern; + Set fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames; + + String patternWithRemotes; + + if (executionInfo.getClusters().isEmpty()) { + patternWithRemotes = localPattern; + } else { + // convert index -> cluster1:index,cluster2:index, etc.for each running cluster + patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING) + .map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern)) + .collect(Collectors.joining(",")); + } + if (patternWithRemotes.isEmpty()) { + return; + } // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( - table.indexPattern(), + patternWithRemotes, fieldNames, null, - listener.map(indexResolution -> result.addLookupIndexResolution(table.indexPattern(), indexResolution)) + listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) ); - // TODO: Verify that the resolved index actually has indexMode: "lookup" + } + + private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) { + VerificationException error = new VerificationException(message); + // If we can, skip the cluster and mark it as such + if (executionInfo.isSkipUnavailable(clusterAlias)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error); + } else { + throw error; + } + } + + /** + * Receive and process lookup index resolutions from resolveAsMergedMapping. + * This processes the lookup index data for a single index, updates and returns the {@link PreAnalysisResult} result + */ + private PreAnalysisResult receiveLookupIndexResolution( + PreAnalysisResult result, + String index, + EsqlExecutionInfo executionInfo, + IndexResolution lookupIndexResolution + ) { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, lookupIndexResolution.unavailableClusters()); + if (lookupIndexResolution.isValid() == false) { + // If the index resolution is invalid, don't bother with the rest of the analysis + return result.addLookupIndexResolution(index, lookupIndexResolution); + } + if (executionInfo.getClusters().isEmpty() || executionInfo.isCrossClusterSearch() == false) { + // Local only case, still do some checks, since we moved analysis checks here + if (lookupIndexResolution.get().indexNameWithModes().size() > 1) { + throw new VerificationException( + "Lookup Join requires a single lookup mode index; [" + index + "] resolves to multiple indices" + ); + } + var indexModeEntry = lookupIndexResolution.get().indexNameWithModes().entrySet().iterator().next(); + if (indexModeEntry.getValue() != IndexMode.LOOKUP) { + throw new VerificationException( + "Lookup Join requires a single lookup mode index; [" + + index + + "] resolves to [" + + indexModeEntry.getKey() + + "] in [" + + indexModeEntry.getValue() + + "] mode" + ); + } + return result.addLookupIndexResolution(index, lookupIndexResolution); + } + // Collect resolved clusters from the index resolution, verify that each cluster has a single resolution for the lookup index + Map clustersWithResolvedIndices = new HashMap<>(lookupIndexResolution.resolvedIndices().size()); + lookupIndexResolution.get().indexNameWithModes().forEach((indexName, indexMode) -> { + String clusterAlias = RemoteClusterAware.parseClusterAlias(indexName); + // Check that all indices are in lookup mode + if (indexMode != IndexMode.LOOKUP) { + skipClusterOrError( + clusterAlias, + executionInfo, + "Lookup Join requires a single lookup mode index; [" + + index + + "] resolves to [" + + indexName + + "] in [" + + indexMode + + "] mode" + ); + } + // Each cluster should have only one resolution for the lookup index + if (clustersWithResolvedIndices.containsKey(clusterAlias)) { + skipClusterOrError( + clusterAlias, + executionInfo, + "Lookup Join requires a single lookup mode index; [" + + index + + "] resolves to multiple indices " + + EsqlCCSUtils.inClusterName(clusterAlias) + ); + } else { + clustersWithResolvedIndices.put(clusterAlias, indexName); + } + }); + + // These are clusters that are still in the running, we need to have the index on all of them + Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); + // Verify that all active clusters have the lookup index resolved + clusters.forEach(cluster -> { + String clusterAlias = cluster.getClusterAlias(); + if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) { + // Missing cluster resolution + skipClusterOrError( + clusterAlias, + executionInfo, + "lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias) + ); + } + }); + + return result.addLookupIndexResolution( + index, + checkSingleIndex(index, executionInfo, lookupIndexResolution, clustersWithResolvedIndices.values()) + ); + } + + /** + * Check whether the lookup index resolves to a single concrete index on all clusters or not. + * If it's a single index, we are compatible with old pre-9.2 LOOKUP JOIN code and just need to send the same resolution as we did. + * If there are multiple index names (e.g. due to aliases) then pre-9.2 clusters won't be able to handle it so we need to skip them. + * @return An updated `IndexResolution` object if the index resolves to a single concrete index, + * or the original `lookupIndexResolution` if no changes are needed. + */ + private IndexResolution checkSingleIndex( + String index, + EsqlExecutionInfo executionInfo, + IndexResolution lookupIndexResolution, + Collection indexNames + ) { + // If all indices resolve to the same name, we can use that for BWC + // Older clusters only can handle one name in LOOKUP JOIN + var localIndexNames = indexNames.stream().map(n -> RemoteClusterAware.splitIndexName(n)[1]).collect(Collectors.toSet()); + if (localIndexNames.size() == 1) { + String indexName = localIndexNames.iterator().next(); + EsIndex newIndex = new EsIndex(index, lookupIndexResolution.get().mapping(), Map.of(indexName, IndexMode.LOOKUP)); + return IndexResolution.valid( + newIndex, + newIndex.concreteIndices(), + lookupIndexResolution.getUnavailableShards(), + lookupIndexResolution.unavailableClusters() + ); + } + // validate remotes to be able to handle multiple indices in LOOKUP JOIN + validateRemoteVersions(executionInfo); + return lookupIndexResolution; + } + + /** + * Older clusters can only handle one name in LOCAL JOIN - verify that all the remotes involved + * are recent enough to be able to handle multiple indices. + * This is only checked if there are actually multiple indices, which happens when remotes have a different + * concrete indices aliased to the same index name. + */ + private void validateRemoteVersions(EsqlExecutionInfo executionInfo) { + Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); + clusters.forEach(cluster -> { + String clusterAlias = cluster.getClusterAlias(); + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + // No need to check local, obviously + var connection = remoteClusterService.getConnection(clusterAlias); + if (connection != null && connection.getTransportVersion().before(TransportVersions.LOOKUP_JOIN_CCS)) { + skipClusterOrError( + clusterAlias, + executionInfo, + "remote cluster [" + + clusterAlias + + "] has version [" + + connection.getTransportVersion() + + "] that does not support multiple indices in LOOKUP JOIN, skipping" + ); + } + } + }); } private void initializeClusterData(List indices, EsqlExecutionInfo executionInfo) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java index c806fec61dc64..cbb825ca9581b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java @@ -184,6 +184,15 @@ public static EnrichResolution defaultEnrichResolution() { "airport_city_boundaries", "mapping-airport_city_boundaries.json" ); + loadEnrichPolicyResolution( + enrichResolution, + Enrich.Mode.COORDINATOR, + MATCH_TYPE, + "languages_coord", + "language_code", + "languages_idx", + "mapping-languages.json" + ); return enrichResolution; } @@ -213,6 +222,25 @@ public static void loadEnrichPolicyResolution( ); } + public static void loadEnrichPolicyResolution( + EnrichResolution enrich, + Enrich.Mode mode, + String policyType, + String policy, + String field, + String index, + String mapping + ) { + IndexResolution indexResolution = loadMapping(mapping, index); + List enrichFields = new ArrayList<>(indexResolution.get().mapping().keySet()); + enrichFields.remove(field); + enrich.addResolvedPolicy( + policy, + mode, + new ResolvedEnrichPolicy(field, policyType, enrichFields, Map.of("", index), indexResolution.get().mapping()) + ); + } + public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) { loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 603c0eaa6e632..b2521bddfb47b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -2285,40 +2285,7 @@ public void testMultipleLookupJoinsGiveDifferentAttributes() { assertEquals(AttributeSet.EMPTY, intersection); } - public void testLookupJoinIndexMode() { - assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); - - var indexResolution = AnalyzerTestUtils.expandedDefaultIndexResolution(); - var lookupResolution = AnalyzerTestUtils.defaultLookupResolution(); - var indexResolutionAsLookup = Map.of("test", indexResolution); - var lookupResolutionAsIndex = lookupResolution.get("languages_lookup"); - - analyze("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code"); - analyze( - "FROM languages_lookup | LOOKUP JOIN languages_lookup ON language_code", - AnalyzerTestUtils.analyzer(lookupResolutionAsIndex, lookupResolution) - ); - - VerificationException e = expectThrows( - VerificationException.class, - () -> analyze( - "FROM languages_lookup | EVAL languages = language_code | LOOKUP JOIN test ON languages", - AnalyzerTestUtils.analyzer(lookupResolutionAsIndex, indexResolutionAsLookup) - ) - ); - assertThat( - e.getMessage(), - containsString("1:70: Lookup Join requires a single lookup mode index; [test] resolves to [test] in [standard] mode") - ); - e = expectThrows( - VerificationException.class, - () -> analyze("FROM test | LOOKUP JOIN test ON languages", AnalyzerTestUtils.analyzer(indexResolution, indexResolutionAsLookup)) - ); - assertThat( - e.getMessage(), - containsString("1:25: Lookup Join requires a single lookup mode index; [test] resolves to [test] in [standard] mode") - ); - } + // Lookup modes are now tested on index resulution public void testImplicitCasting() { var e = expectThrows(VerificationException.class, () -> analyze(""" diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index fe900fbc211eb..fc38af5569b98 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.parser.QueryParam; import org.elasticsearch.xpack.esql.parser.QueryParams; @@ -2222,6 +2223,60 @@ public void testFullTextFunctionsInStats() { } } + public void testRemoteLookupJoinWithPipelineBreaker() { + assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + var analyzer = AnalyzerTestUtils.analyzer(loadMapping("mapping-default.json", "test,remote:test")); + assertEquals( + "1:92: LOOKUP JOIN with remote indices can't be executed after [STATS c = COUNT(*) by languages]@1:25", + error( + "FROM test,remote:test | STATS c = COUNT(*) by languages " + + "| EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + + assertEquals( + "1:72: LOOKUP JOIN with remote indices can't be executed after [SORT emp_no]@1:25", + error( + "FROM test,remote:test | SORT emp_no | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + + assertEquals( + "1:68: LOOKUP JOIN with remote indices can't be executed after [LIMIT 2]@1:25", + error( + "FROM test,remote:test | LIMIT 2 | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + assertEquals( + "1:96: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@1:58", + error( + "FROM test,remote:test | EVAL language_code = languages | ENRICH _coordinator:languages_coord " + + "| LOOKUP JOIN languages_lookup ON language_code", + analyzer + ) + ); + } + + public void testRemoteLookupJoinIsSnapshot() { + // TODO: remove when we allow remote joins in release builds + assumeTrue("Remote LOOKUP JOIN not enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + assertTrue(Build.current().isSnapshot()); + } + + public void testRemoteLookupJoinIsDisabled() { + // TODO: remove when we allow remote joins in release builds + assumeFalse("Remote LOOKUP JOIN enabled", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + ParsingException e = expectThrows( + ParsingException.class, + () -> query("FROM test,remote:test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code") + ); + assertThat(e.getMessage(), containsString("remote clusters are not supported with LOOKUP JOIN")); + + } + private void checkFullTextFunctionsInStats(String functionInvocation) { query("from test | stats c = max(id) where " + functionInvocation, fullTextAnalyzer); query("from test | stats c = max(id) where " + functionInvocation + " or length(title) > 10", fullTextAnalyzer); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index 1d6730eb40ff5..1cd112114e027 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -3298,6 +3298,17 @@ public void testInvalidPatternsWithIntermittentQuotes() { } } + public void testValidJoinPatternWithRemote() { + assumeTrue("LOOKUP JOIN requires corresponding capability", EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()); + var fromPatterns = randomIndexPatterns(CROSS_CLUSTER); + var joinPattern = randomIndexPattern(without(CROSS_CLUSTER), without(WILDCARD_PATTERN), without(INDEX_SELECTOR)); + var plan = statement("FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier()); + + var join = as(plan, LookupJoin.class); + assertThat(as(join.left(), UnresolvedRelation.class).indexPattern().indexPattern(), equalTo(unquoteIndexPattern(fromPatterns))); + assertThat(as(join.right(), UnresolvedRelation.class).indexPattern().indexPattern(), equalTo(unquoteIndexPattern(joinPattern))); + } + public void testInvalidJoinPatterns() { assumeTrue("LOOKUP JOIN requires corresponding capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); @@ -3318,18 +3329,6 @@ public void testInvalidJoinPatterns() { "invalid index pattern [" + unquoteIndexPattern(joinPattern) + "], remote clusters are not supported with LOOKUP JOIN" ); } - { - // remote cluster on the left - var fromPatterns = randomIndexPatterns(CROSS_CLUSTER); - var joinPattern = randomIndexPattern(without(CROSS_CLUSTER), without(WILDCARD_PATTERN), without(INDEX_SELECTOR)); - expectError( - "FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(), - "invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with LOOKUP JOIN" - ); - } - - // If one or more patterns participating in LOOKUP JOINs are partially quoted, we expect the partial quoting - // error messages to take precedence over any LOOKUP JOIN error messages. { // Generate a syntactically invalid (partial quoted) pattern. diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 85f568415eb4e..65d3a750e8a41 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -145,13 +145,13 @@ basic: - match: {values.1: [2, "yellow"]} --- -fails with non-lookup index: +fails with non-lookup index v2: - requires: capabilities: - method: POST path: /_query parameters: [] - capabilities: [update_lookup_join_error_messages] + capabilities: [enable_lookup_join_on_remote] reason: "checks updated error messages" - do: esql.query: @@ -160,7 +160,7 @@ fails with non-lookup index: catch: "bad_request" - match: { error.type: "verification_exception" } - - contains: { error.reason: "Found 1 problem\nline 1:45: Lookup Join requires a single lookup mode index; [test] resolves to [test] in [standard] mode" } + - contains: { error.reason: "Lookup Join requires a single lookup mode index; [test] resolves to [test] in [standard] mode" } --- pattern-multiple: diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml index 43af8293e9899..82f4de8038007 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml @@ -186,13 +186,13 @@ alias-repeated-index: - match: {values.1: [2, "yellow"]} --- -fails when alias or pattern resolves to multiple: +fails when alias or pattern resolves to multiple v2: - requires: capabilities: - method: POST path: /_query parameters: [] - capabilities: [update_lookup_join_error_messages] + capabilities: [enable_lookup_join_on_remote] reason: "checks updated error messages" - do: esql.query: @@ -201,7 +201,7 @@ fails when alias or pattern resolves to multiple: catch: "bad_request" - match: { error.type: "verification_exception" } - - contains: { error.reason: "Found 1 problem\nline 1:34: Lookup Join requires a single lookup mode index; [test-lookup-alias-pattern-multiple] resolves to [4] indices" } + - contains: { error.reason: "Lookup Join requires a single lookup mode index; [test-lookup-alias-pattern-multiple] resolves to multiple indices" } --- alias-pattern-single: