From 3f614af8c269e029adad249dd3e3fe1b873fb618 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 5 Jun 2025 13:54:29 -0600 Subject: [PATCH 01/71] Remote lookup join implementation --- .../xpack/esql/parser/LogicalPlanBuilder.java | 3 +- .../xpack/esql/session/EsqlCCSUtils.java | 8 ++ .../xpack/esql/session/EsqlSession.java | 108 +++++++++++++++++- 3 files changed, 112 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index a8ea1ba95dc15..51d905574f0b6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -70,6 +70,7 @@ import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; @@ -588,7 +589,7 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { var target = ctx.joinTarget(); var rightPattern = visitIndexPattern(List.of(target.index)); if (rightPattern.contains(WILDCARD)) { - throw new ParsingException(source(target), "invalid index pattern [{}], * is not allowed in LOOKUP JOIN", rightPattern); + throw new ParsingException(source(target), "invalid index pattern [{}], * is no t allowed in LOOKUP JOIN", rightPattern); } if (RemoteClusterAware.isRemoteIndexName(rightPattern)) { throw new ParsingException( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index d507b8275178d..901057f4db61c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -384,4 +384,12 @@ public static boolean canAllowPartial(Exception e) { } return true; } + + public static String inClusterName(String clusterAlias) { + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + return "in local cluster"; + } else { + return "in remote cluster [" + clusterAlias + "]"; + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 4ff65f59bbd72..0097e3ea064ff 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -28,6 +28,7 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; @@ -110,6 +111,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; @@ -375,7 +377,7 @@ public void analyzedPlan( .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); // first resolve the lookup indices, then the main indices for (var index : preAnalysis.lookupIndices) { - listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(index, preAnalysisResult, l); }); + listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l); }); } listener.andThen((l, result) -> { // resolve the main indices @@ -414,16 +416,110 @@ public void analyzedPlan( }).addListener(logicalPlanListener); } - private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, ActionListener listener) { - Set fieldNames = result.wildcardJoinIndices().contains(table.indexPattern()) ? IndexResolver.ALL_FIELDS : result.fieldNames; + private void preAnalyzeLookupIndex( + IndexPattern table, + PreAnalysisResult result, + EsqlExecutionInfo executionInfo, + ActionListener listener + ) { + String localPattern = table.indexPattern(); + assert RemoteClusterAware.isRemoteIndexName(localPattern) == false + : "Lookup index name should not include remote, but got: " + localPattern; + Set fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames; + // Get the list of active clusters for the lookup index + Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); + StringBuilder patternWithRemotes = new StringBuilder(localPattern); + // Create a pattern with all active remote clusters + clusters.forEach(cluster -> { + String clusterAlias = cluster.getClusterAlias(); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // Skip the local cluster, as it is already included in the localPattern + return; + } + patternWithRemotes.append(",").append(clusterAlias).append(":").append(localPattern); + }); // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( - table.indexPattern(), + patternWithRemotes.toString(), fieldNames, null, - listener.map(indexResolution -> result.addLookupIndexResolution(table.indexPattern(), indexResolution)) + listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) ); - // TODO: Verify that the resolved index actually has indexMode: "lookup" + } + + private PreAnalysisResult receiveLookupIndexResolution( + PreAnalysisResult result, + String index, + EsqlExecutionInfo executionInfo, + IndexResolution newIndexResolution + ) { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, newIndexResolution.unavailableClusters()); + if (newIndexResolution.isValid() == false) { + // If the index resolution is invalid, don't bother with the rest of the analysis + return result.addLookupIndexResolution(index, newIndexResolution); + } + // Collect resolved clusters from the index resolution, verify that each cluster has a single resolution for the lookup index + Map clustersWithResolvedIndices = new HashMap<>(newIndexResolution.resolvedIndices().size()); + newIndexResolution.get().indexNameWithModes().forEach((indexName, indexMode) -> { + if (indexMode != IndexMode.LOOKUP) { + throw new VerificationException( + "Lookup index [" + indexName + "] has index mode [" + indexMode + "], expected [" + IndexMode.LOOKUP + "]" + ); + } + String clusterAlias = RemoteClusterAware.parseClusterAlias(indexName); + // Each cluster should have only one resolution for the lookup index + if (clustersWithResolvedIndices.containsKey(clusterAlias)) { + throw new VerificationException( + "Multiple resolutions for lookup index [" + index + "] " + EsqlCCSUtils.inClusterName(clusterAlias) + ); + } else { + clustersWithResolvedIndices.put(clusterAlias, indexName); + } + }); + + // These are clusters that are still in the running, we need to have the index on all of them + Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); + // Verify that all active clusters have the lookup index resolved + clusters.forEach(cluster -> { + String clusterAlias = cluster.getClusterAlias(); + if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) { + // Missing cluster resolution + VerificationException error = new VerificationException( + "Lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias) + ); + // For now, local cluster can not be skipped, so we throw an error + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) + || executionInfo.isSkipUnavailable(clusterAlias) == false) { + throw error; + } else { + // If we can, skip the cluster and mark it as such + EsqlCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.SKIPPED, + error + ); + } + } + }); + + if (clustersWithResolvedIndices.size() > 1) { + // If we have multiple resolutions for the lookup index, we need to only leave the local resolution + String localIndexName = clustersWithResolvedIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + if (localIndexName == null) { + // Get the first index name instead + localIndexName = clustersWithResolvedIndices.values().iterator().next(); + } + var localIndex = new EsIndex(index, newIndexResolution.get().mapping(), Map.of(localIndexName, IndexMode.LOOKUP)); + newIndexResolution = IndexResolution.valid( + localIndex, + localIndex.concreteIndices(), + newIndexResolution.getUnavailableShards(), + newIndexResolution.unavailableClusters() + ); + } + + return result.addLookupIndexResolution(index, newIndexResolution); } private void initializeClusterData(List indices, EsqlExecutionInfo executionInfo) { From ba8a0398234db0e311ff0d32fc9a7d0003369e59 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 5 Jun 2025 14:55:21 -0600 Subject: [PATCH 02/71] refactor more --- .../xpack/esql/session/EsqlSession.java | 70 ++++++++++--------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 0097e3ea064ff..8d71dca75502f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -377,7 +377,7 @@ public void analyzedPlan( .andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l)); // first resolve the lookup indices, then the main indices for (var index : preAnalysis.lookupIndices) { - listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l); }); + listener = listener.andThen((l, preAnalysisResult) -> preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l)); } listener.andThen((l, result) -> { // resolve the main indices @@ -426,18 +426,20 @@ private void preAnalyzeLookupIndex( assert RemoteClusterAware.isRemoteIndexName(localPattern) == false : "Lookup index name should not include remote, but got: " + localPattern; Set fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames; - // Get the list of active clusters for the lookup index - Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); StringBuilder patternWithRemotes = new StringBuilder(localPattern); - // Create a pattern with all active remote clusters - clusters.forEach(cluster -> { - String clusterAlias = cluster.getClusterAlias(); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - // Skip the local cluster, as it is already included in the localPattern - return; - } - patternWithRemotes.append(",").append(clusterAlias).append(":").append(localPattern); - }); + if (executionInfo.getClusters().isEmpty() == false) { + // Get the list of active clusters for the lookup index + Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); + // Create a pattern with all active remote clusters + clusters.forEach(cluster -> { + String clusterAlias = cluster.getClusterAlias(); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // Skip the local cluster, as it is already included in the localPattern + return; + } + patternWithRemotes.append(",").append(clusterAlias).append(":").append(localPattern); + }); + } // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( patternWithRemotes.toString(), @@ -447,6 +449,16 @@ private void preAnalyzeLookupIndex( ); } + private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) { + VerificationException error = new VerificationException(message); + // If we can, skip the cluster and mark it as such + if (executionInfo.isSkipUnavailable(clusterAlias)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error); + } else { + throw error; + } + } + private PreAnalysisResult receiveLookupIndexResolution( PreAnalysisResult result, String index, @@ -454,22 +466,27 @@ private PreAnalysisResult receiveLookupIndexResolution( IndexResolution newIndexResolution ) { EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, newIndexResolution.unavailableClusters()); - if (newIndexResolution.isValid() == false) { + if (newIndexResolution.isValid() == false || executionInfo.getClusters().isEmpty()) { // If the index resolution is invalid, don't bother with the rest of the analysis return result.addLookupIndexResolution(index, newIndexResolution); } // Collect resolved clusters from the index resolution, verify that each cluster has a single resolution for the lookup index Map clustersWithResolvedIndices = new HashMap<>(newIndexResolution.resolvedIndices().size()); newIndexResolution.get().indexNameWithModes().forEach((indexName, indexMode) -> { + String clusterAlias = RemoteClusterAware.parseClusterAlias(indexName); + // Check that all indices are in lookup mode if (indexMode != IndexMode.LOOKUP) { - throw new VerificationException( + skipClusterOrError( + clusterAlias, + executionInfo, "Lookup index [" + indexName + "] has index mode [" + indexMode + "], expected [" + IndexMode.LOOKUP + "]" ); } - String clusterAlias = RemoteClusterAware.parseClusterAlias(indexName); // Each cluster should have only one resolution for the lookup index if (clustersWithResolvedIndices.containsKey(clusterAlias)) { - throw new VerificationException( + skipClusterOrError( + clusterAlias, + executionInfo, "Multiple resolutions for lookup index [" + index + "] " + EsqlCCSUtils.inClusterName(clusterAlias) ); } else { @@ -484,22 +501,11 @@ private PreAnalysisResult receiveLookupIndexResolution( String clusterAlias = cluster.getClusterAlias(); if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) { // Missing cluster resolution - VerificationException error = new VerificationException( - "Lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias) + skipClusterOrError( + clusterAlias, + executionInfo, + "Lookup index [" + index + "] is not available in cluster " + EsqlCCSUtils.inClusterName(clusterAlias) ); - // For now, local cluster can not be skipped, so we throw an error - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) - || executionInfo.isSkipUnavailable(clusterAlias) == false) { - throw error; - } else { - // If we can, skip the cluster and mark it as such - EsqlCCSUtils.markClusterWithFinalStateAndNoShards( - executionInfo, - clusterAlias, - EsqlExecutionInfo.Cluster.Status.SKIPPED, - error - ); - } } }); @@ -508,7 +514,7 @@ private PreAnalysisResult receiveLookupIndexResolution( String localIndexName = clustersWithResolvedIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (localIndexName == null) { // Get the first index name instead - localIndexName = clustersWithResolvedIndices.values().iterator().next(); + localIndexName = RemoteClusterAware.splitIndexName(clustersWithResolvedIndices.values().iterator().next())[1]; } var localIndex = new EsIndex(index, newIndexResolution.get().mapping(), Map.of(localIndexName, IndexMode.LOOKUP)); newIndexResolution = IndexResolution.valid( From 2b5aaef3a2ef08efd69fe9ffecb9d6c5f4b8a7ce Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 5 Jun 2025 16:37:22 -0600 Subject: [PATCH 03/71] Oops typo --- .../org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 51d905574f0b6..1c6a8e8155abd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -589,7 +589,7 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) { var target = ctx.joinTarget(); var rightPattern = visitIndexPattern(List.of(target.index)); if (rightPattern.contains(WILDCARD)) { - throw new ParsingException(source(target), "invalid index pattern [{}], * is no t allowed in LOOKUP JOIN", rightPattern); + throw new ParsingException(source(target), "invalid index pattern [{}], * is not allowed in LOOKUP JOIN", rightPattern); } if (RemoteClusterAware.isRemoteIndexName(rightPattern)) { throw new ParsingException( From c3ae03d571d7be4f94f0db2be80f3df752f6cfba Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 6 Jun 2025 13:07:37 -0600 Subject: [PATCH 04/71] test fixes --- .../rest/RequestIndexFilteringTestCase.java | 4 +- .../xpack/esql/action/EsqlCapabilities.java | 5 ++ .../xpack/esql/session/EsqlSession.java | 4 +- .../esql/parser/StatementParserTests.java | 69 +++---------------- .../test/esql/190_lookup_join.yml | 2 +- .../test/esql/192_lookup_join_on_aliases.yml | 2 +- 6 files changed, 22 insertions(+), 64 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index 1ba4365ea3e92..2f984e9f98997 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -213,7 +213,7 @@ public void testIndicesDontExist() throws IOException { assertThat(e.getMessage(), containsString("index_not_found_exception")); assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]"))); - if (EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()) { + if (EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()) { var pattern = from("test1"); e = expectThrows( ResponseException.class, @@ -225,7 +225,7 @@ public void testIndicesDontExist() throws IOException { // currently we don't support remote clusters in LOOKUP JOIN // this check happens before resolving actual indices and results in a different error message RemoteClusterAware.isRemoteIndexName(pattern) - ? allOf(containsString("parsing_exception"), containsString("remote clusters are not supported")) + ? allOf(containsString("verification_exception"), containsString("Unknown index [foo,remote_cluster:foo]")) : allOf(containsString("verification_exception"), containsString("Unknown index [foo]")) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 9a8b71e8e5eea..c698b83ca21d0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1187,6 +1187,11 @@ public enum Cap { */ RLIKE_WITH_EMPTY_LANGUAGE_PATTERN, + /** + * Enable support for cross-cluster lookup joins. + */ + ENABLE_LOOKUP_JOIN_ON_REMOTE, + /** * MATCH PHRASE function */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 8d71dca75502f..365c9b666820b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -479,7 +479,7 @@ private PreAnalysisResult receiveLookupIndexResolution( skipClusterOrError( clusterAlias, executionInfo, - "Lookup index [" + indexName + "] has index mode [" + indexMode + "], expected [" + IndexMode.LOOKUP + "]" + "invalid [" + indexName + "] resolution in lookup mode to an index in [" + indexMode + "] mode" ); } // Each cluster should have only one resolution for the lookup index @@ -487,7 +487,7 @@ private PreAnalysisResult receiveLookupIndexResolution( skipClusterOrError( clusterAlias, executionInfo, - "Multiple resolutions for lookup index [" + index + "] " + EsqlCCSUtils.inClusterName(clusterAlias) + "multiple resolutions for lookup index [" + index + "] " + EsqlCCSUtils.inClusterName(clusterAlias) ); } else { clustersWithResolvedIndices.put(clusterAlias, indexName); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index dc7256e3c4521..cd91a7193317d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -3297,6 +3297,17 @@ public void testInvalidPatternsWithIntermittentQuotes() { } } + public void testValidJoinPatternWithRemote() { + assumeTrue("LOOKUP JOIN requires corresponding capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); + var fromPatterns = randomIndexPatterns(CROSS_CLUSTER); + var joinPattern = randomIndexPattern(without(CROSS_CLUSTER), without(WILDCARD_PATTERN), without(INDEX_SELECTOR)); + var plan = statement("FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier()); + + var join = as(plan, LookupJoin.class); + assertThat(as(join.left(), UnresolvedRelation.class).indexPattern().indexPattern(), equalTo(unquoteIndexPattern(fromPatterns))); + assertThat(as(join.right(), UnresolvedRelation.class).indexPattern().indexPattern(), equalTo(unquoteIndexPattern(joinPattern))); + } + public void testInvalidJoinPatterns() { assumeTrue("LOOKUP JOIN requires corresponding capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()); @@ -3317,64 +3328,6 @@ public void testInvalidJoinPatterns() { "invalid index pattern [" + unquoteIndexPattern(joinPattern) + "], remote clusters are not supported with LOOKUP JOIN" ); } - { - // remote cluster on the left - var fromPatterns = randomIndexPatterns(CROSS_CLUSTER); - var joinPattern = randomIndexPattern(without(CROSS_CLUSTER), without(WILDCARD_PATTERN), without(INDEX_SELECTOR)); - expectError( - "FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(), - "invalid index pattern [" + unquoteIndexPattern(fromPatterns) + "], remote clusters are not supported with LOOKUP JOIN" - ); - } - - // If one or more patterns participating in LOOKUP JOINs are partially quoted, we expect the partial quoting - // error messages to take precedence over any LOOKUP JOIN error messages. - - { - // Generate a syntactically invalid (partial quoted) pattern. - var fromPatterns = quote(randomIdentifier()) + ":" + unquoteIndexPattern(randomIndexPattern(without(CROSS_CLUSTER))); - var joinPattern = randomIndexPattern(); - expectError( - "FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(), - // Since the from pattern is partially quoted, we get an error at the end of the partially quoted string. - " mismatched input ':'" - ); - } - - { - // Generate a syntactically invalid (partial quoted) pattern. - var fromPatterns = randomIdentifier() + ":" + quote(randomIndexPatterns(without(CROSS_CLUSTER))); - var joinPattern = randomIndexPattern(); - expectError( - "FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(), - // Since the from pattern is partially quoted, we get an error at the beginning of the partially quoted - // index name that we're expecting an unquoted string. - "expecting UNQUOTED_SOURCE" - ); - } - - { - var fromPatterns = randomIndexPattern(); - // Generate a syntactically invalid (partial quoted) pattern. - var joinPattern = quote(randomIdentifier()) + ":" + unquoteIndexPattern(randomIndexPattern(without(CROSS_CLUSTER))); - expectError( - "FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(), - // Since the join pattern is partially quoted, we get an error at the end of the partially quoted string. - "mismatched input ':'" - ); - } - - { - var fromPatterns = randomIndexPattern(); - // Generate a syntactically invalid (partial quoted) pattern. - var joinPattern = randomIdentifier() + ":" + quote(randomIndexPattern(without(CROSS_CLUSTER))); - expectError( - "FROM " + fromPatterns + " | LOOKUP JOIN " + joinPattern + " ON " + randomIdentifier(), - // Since the from pattern is partially quoted, we get an error at the beginning of the partially quoted - // index name that we're expecting an unquoted string. - "expecting UNQUOTED_SOURCE" - ); - } if (EsqlCapabilities.Cap.INDEX_COMPONENT_SELECTORS.isEnabled()) { { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 85f568415eb4e..9b0e2d393e8c5 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -160,7 +160,7 @@ fails with non-lookup index: catch: "bad_request" - match: { error.type: "verification_exception" } - - contains: { error.reason: "Found 1 problem\nline 1:45: Lookup Join requires a single lookup mode index; [test] resolves to [test] in [standard] mode" } + - contains: { error.reason: "invalid [test] resolution in lookup mode to an index in [standard] mode" } --- pattern-multiple: diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml index 43af8293e9899..c2974b2ee3aa9 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/192_lookup_join_on_aliases.yml @@ -201,7 +201,7 @@ fails when alias or pattern resolves to multiple: catch: "bad_request" - match: { error.type: "verification_exception" } - - contains: { error.reason: "Found 1 problem\nline 1:34: Lookup Join requires a single lookup mode index; [test-lookup-alias-pattern-multiple] resolves to [4] indices" } + - contains: { error.reason: "multiple resolutions for lookup index [test-lookup-alias-pattern-multiple] in local cluster" } --- alias-pattern-single: From 622f4e65f313ec2465e67f21f8e11556748ed31d Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 6 Jun 2025 14:48:58 -0600 Subject: [PATCH 05/71] fix tests --- x-pack/plugin/build.gradle | 5 ----- 1 file changed, 5 deletions(-) diff --git a/x-pack/plugin/build.gradle b/x-pack/plugin/build.gradle index 34b4f412e849a..2000db31d74de 100644 --- a/x-pack/plugin/build.gradle +++ b/x-pack/plugin/build.gradle @@ -97,11 +97,6 @@ tasks.named("yamlRestCompatTestTransform").configure({ task -> task.skipTest("esql/180_match_operator/match with functions", "Error message changed") task.skipTest("esql/180_match_operator/match within eval", "Error message changed") task.skipTest("esql/40_unsupported_types/semantic_text declared in mapping", "The semantic text field format changed") - task.skipTest("esql/190_lookup_join/Alias as lookup index", "LOOKUP JOIN does not support index aliases for now") - task.skipTest("esql/190_lookup_join/alias-repeated-alias", "LOOKUP JOIN does not support index aliases for now") - task.skipTest("esql/190_lookup_join/alias-repeated-index", "LOOKUP JOIN does not support index aliases for now") - task.skipTest("esql/190_lookup_join/alias-pattern-multiple", "LOOKUP JOIN does not support index aliases for now") - task.skipTest("esql/190_lookup_join/alias-pattern-single", "LOOKUP JOIN does not support index aliases for now") task.skipTest("esql/180_match_operator/match with disjunctions", "Disjunctions in full text functions work now") task.skipTest("esql/130_spatial/values unsupported for geo_point", "Spatial types are now supported in VALUES aggregation") task.skipTest("esql/130_spatial/values unsupported for geo_point status code", "Spatial types are now supported in VALUES aggregation") From d31cbfb8a28d416300dbe055cdac66fa8bdb0e92 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 9 Jun 2025 18:03:37 -0600 Subject: [PATCH 06/71] test fixes --- x-pack/plugin/build.gradle | 1 - .../elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java | 1 - .../xpack/esql/ccq/RequestIndexFilteringIT.java | 8 ++++++++ .../esql/qa/rest/RequestIndexFilteringTestCase.java | 9 +++++++-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/build.gradle b/x-pack/plugin/build.gradle index 2000db31d74de..f40cf2d292ca3 100644 --- a/x-pack/plugin/build.gradle +++ b/x-pack/plugin/build.gradle @@ -128,7 +128,6 @@ tasks.named("yamlRestCompatTestTransform").configure({ task -> task.replaceValueInMatch("Size", 49, "Test flamegraph from test-events") task.skipTest("esql/90_non_indexed/fetch", "Temporary until backported") task.skipTest("esql/63_enrich_int_range/Invalid age as double", "TODO: require disable allow_partial_results") - task.skipTest("esql/191_lookup_join_on_datastreams/data streams not supported in LOOKUP JOIN", "Added support for aliases in JOINs") task.skipTest("esql/190_lookup_join/non-lookup index", "Error message changed") task.skipTest("esql/192_lookup_join_on_aliases/alias-pattern-multiple", "Error message changed") }) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index be0feade7f8af..1fc9c892f0407 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -129,7 +129,6 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName())); - assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())); // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName())); diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java index c3e29602a8b8c..eb9ceefb3ed49 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.hamcrest.Matcher; @@ -153,6 +154,13 @@ private static boolean checkVersion(org.elasticsearch.Version version) { || (version.onOrAfter(Version.fromString("8.19.0")) && version.before(Version.fromString("9.0.0"))); } + @Override + protected boolean canDoRemoteTest() { + return EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled() + && Clusters.localClusterVersion().onOrAfter(Version.fromString("9.1.0")); + // TODO: add 8.19 if this is merged to 8.x + } + // We need a separate test since remote missing indices and local missing indices now work differently public void testIndicesDontExistRemote() throws IOException { // Exclude old versions diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index 2f984e9f98997..b9f0232b58cdc 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -194,6 +194,10 @@ public void testFieldNameTypo() throws IOException { assertThat(e.getMessage(), containsString("Unknown column [idx]")); } + protected boolean canDoRemoteTest() { + return false; + } + public void testIndicesDontExist() throws IOException { int docsTest1 = randomIntBetween(1, 5); indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); @@ -213,8 +217,9 @@ public void testIndicesDontExist() throws IOException { assertThat(e.getMessage(), containsString("index_not_found_exception")); assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]"))); - if (EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()) { - var pattern = from("test1"); + var pattern = from("test1"); + if (EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled() + && (RemoteClusterAware.isRemoteIndexName(pattern) == false || canDoRemoteTest())) { e = expectThrows( ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1")) From 02cd72be5ac9c5183457c62b9c26e8b11142cbc5 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 10 Jun 2025 00:14:05 +0000 Subject: [PATCH 07/71] [CI] Auto commit changes from spotless --- .../org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 1fc9c892f0407..7d6795b157193 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -50,7 +50,6 @@ import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.UNMAPPED_FIELDS; From 3044290a282476fecc80cde9f86a15cc29771c8b Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 9 Jun 2025 22:49:54 -0600 Subject: [PATCH 08/71] fix tests --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 50 ++++++++++++++----- .../esql/ccq/RequestIndexFilteringIT.java | 24 +++++++-- .../rest/RequestIndexFilteringTestCase.java | 23 ++------- 3 files changed, 63 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 7d6795b157193..c2c4495144919 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -39,8 +39,10 @@ import java.util.List; import java.util.Locale; import java.util.Optional; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; @@ -50,6 +52,7 @@ import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.UNMAPPED_FIELDS; @@ -128,6 +131,10 @@ protected void shouldSkipTest(String testName) throws IOException { assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())); assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName())); + if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) { + assumeTrue("LOKUP JOIN not supported", supportsIndexModeLookup()); + assumeTrue("LOOKUP JOIN not yet supported in CCS", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.1.0"))); + } // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName())); @@ -179,6 +186,25 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw // These indices are used in metadata tests so we want them on remote only for consistency public static final List METADATA_INDICES = List.of("employees", "apps", "ul_logs"); + // These are loopkup indices, we want them on remotes and locals + public static final Set LOOKUP_INDICES = Stream.of( + "languages_nested_fields", + "languages_lookup", + "clientips_lookup", + "languages_mixed_numerics", + "threat_list", + "message_types_lookup", + "host_inventory", + "ownerships", + "languages_lookup_non_unique_key", + "lookup_sample_data_ts_nanos", + "service_owners" + ).map(i -> "/" + i + "/_bulk").collect(Collectors.toSet()); + + public static final Set ENRICH_ENDPOINTS = ENRICH_SOURCE_INDICES.stream() + .map(i -> "/" + i + "/_bulk") + .collect(Collectors.toSet()); + /** * Creates a new mock client that dispatches every request to both the local and remote clusters, excluding _bulk and _query requests. * - '_bulk' requests are randomly sent to either the local or remote cluster to populate data. Some spec tests, such as AVG, @@ -197,15 +223,17 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return localClient.performRequest(request); } else if (endpoint.endsWith("/_bulk") && METADATA_INDICES.stream().anyMatch(i -> endpoint.equals("/" + i + "/_bulk"))) { return remoteClient.performRequest(request); - } else if (endpoint.endsWith("/_bulk") && ENRICH_SOURCE_INDICES.stream().noneMatch(i -> endpoint.equals("/" + i + "/_bulk"))) { - return bulkClient.performRequest(request); - } else { - Request[] clones = cloneRequests(request, 2); - Response resp1 = remoteClient.performRequest(clones[0]); - Response resp2 = localClient.performRequest(clones[1]); - assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); - return resp2; - } + } else if (endpoint.endsWith("/_bulk") + && ENRICH_ENDPOINTS.contains(endpoint) == false + && LOOKUP_INDICES.contains(endpoint) == false) { + return bulkClient.performRequest(request); + } else { + Request[] clones = cloneRequests(request, 2); + Response resp1 = remoteClient.performRequest(clones[0]); + Response resp2 = localClient.performRequest(clones[1]); + assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); + return resp2; + } }); doAnswer(invocation -> { IOUtils.close(localClient, remoteClient); @@ -357,9 +385,7 @@ protected boolean supportsInferenceTestService() { @Override protected boolean supportsIndexModeLookup() throws IOException { - // CCS does not yet support JOIN_LOOKUP_V10 and clusters falsely report they have this capability - // return hasCapabilities(List.of(JOIN_LOOKUP_V10.capabilityName())); - return false; + return hasCapabilities(List.of(JOIN_LOOKUP_V12.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java index eb9ceefb3ed49..392a34ebfa4da 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java @@ -36,6 +36,8 @@ import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -154,11 +156,25 @@ private static boolean checkVersion(org.elasticsearch.Version version) { || (version.onOrAfter(Version.fromString("8.19.0")) && version.before(Version.fromString("9.0.0"))); } - @Override - protected boolean canDoRemoteTest() { - return EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled() - && Clusters.localClusterVersion().onOrAfter(Version.fromString("9.1.0")); + public void testIndicesDontExistWithRemotePattern() throws IOException { // TODO: add 8.19 if this is merged to 8.x + assumeTrue("Only works with remote LOOKUP JOIN support", Clusters.localClusterVersion().onOrAfter(Version.fromString("9.1.0"))); + + int docsTest1 = randomIntBetween(1, 5); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + + if (EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE.isEnabled()) { + var pattern = "FROM test1,*:test1"; + ResponseException e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat( + e.getMessage(), + allOf(containsString("verification_exception"), containsString("Unknown index [foo,remote_cluster:foo]")) + ); + } } // We need a separate test since remote missing indices and local missing indices now work differently diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index b9f0232b58cdc..3354fb1413e06 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -13,7 +13,6 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.esql.AssertWarnings; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; @@ -194,10 +193,6 @@ public void testFieldNameTypo() throws IOException { assertThat(e.getMessage(), containsString("Unknown column [idx]")); } - protected boolean canDoRemoteTest() { - return false; - } - public void testIndicesDontExist() throws IOException { int docsTest1 = randomIntBetween(1, 5); indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); @@ -215,24 +210,16 @@ public void testIndicesDontExist() throws IOException { e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo, test1"))); assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); assertThat(e.getMessage(), containsString("index_not_found_exception")); - assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]"))); + assertThat(e.getMessage(), containsString("no such index [foo]")); - var pattern = from("test1"); - if (EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled() - && (RemoteClusterAware.isRemoteIndexName(pattern) == false || canDoRemoteTest())) { + // Don't test remote patterns here, we'll test them in the multi-cluster tests + if (EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled()) { e = expectThrows( ResponseException.class, - () -> runEsql(timestampFilter("gte", "2020-01-01").query(pattern + " | LOOKUP JOIN foo ON id1")) + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test1 | LOOKUP JOIN foo ON id1")) ); assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); - assertThat( - e.getMessage(), - // currently we don't support remote clusters in LOOKUP JOIN - // this check happens before resolving actual indices and results in a different error message - RemoteClusterAware.isRemoteIndexName(pattern) - ? allOf(containsString("verification_exception"), containsString("Unknown index [foo,remote_cluster:foo]")) - : allOf(containsString("verification_exception"), containsString("Unknown index [foo]")) - ); + assertThat(e.getMessage(), allOf(containsString("verification_exception"), containsString("Unknown index [foo]"))); } } From 2c63a2313ed72b82db4152880911a876a3d8454d Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 10 Jun 2025 11:18:32 -0600 Subject: [PATCH 09/71] fix more tests --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 39 +++++++++++++------ .../src/main/resources/lookup-join.csv-spec | 2 +- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index c2c4495144919..747b748424b6d 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -111,6 +111,14 @@ public MultiClusterSpecIT( super(fileName, groupName, testName, lineNumber, convertToRemoteIndices(testCase), instructions, mode); } + // TODO: think how to handle this better + public static final Set LOOKUP_JOIN_AFTER_STATS_TESTS = Set.of( + "StatsAndLookupIPAndMessageFromIndex", + "JoinMaskingRegex", + "StatsAndLookupIPFromIndex", + "StatsAndLookupMessageFromIndex" + ); + @Override protected void shouldSkipTest(String testName) throws IOException { boolean remoteMetadata = testCase.requiredCapabilities.contains(METADATA_FIELDS_REMOTE_TEST.capabilityName()); @@ -138,6 +146,8 @@ protected void shouldSkipTest(String testName) throws IOException { // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V9.capabilityName())); + // Tests that use capabilities not supported in CCS + assumeFalse("LOOKUP JOIN after stats not yet supported in CCS", LOOKUP_JOIN_AFTER_STATS_TESTS.contains(testName)); } @Override @@ -186,8 +196,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw // These indices are used in metadata tests so we want them on remote only for consistency public static final List METADATA_INDICES = List.of("employees", "apps", "ul_logs"); - // These are loopkup indices, we want them on remotes and locals - public static final Set LOOKUP_INDICES = Stream.of( + // These are lookup indices, we want them on both remotes and locals + // TODO: can we somehow find it from the data loader? + public static final Set LOOKUP_INDICES = Set.of( "languages_nested_fields", "languages_lookup", "clientips_lookup", @@ -199,7 +210,9 @@ protected RestClient buildClient(Settings settings, HttpHost[] localHosts) throw "languages_lookup_non_unique_key", "lookup_sample_data_ts_nanos", "service_owners" - ).map(i -> "/" + i + "/_bulk").collect(Collectors.toSet()); + ); + + public static final Set LOOKUP_ENDPOINTS = LOOKUP_INDICES.stream().map(i -> "/" + i + "/_bulk").collect(Collectors.toSet()); public static final Set ENRICH_ENDPOINTS = ENRICH_SOURCE_INDICES.stream() .map(i -> "/" + i + "/_bulk") @@ -225,7 +238,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return remoteClient.performRequest(request); } else if (endpoint.endsWith("/_bulk") && ENRICH_ENDPOINTS.contains(endpoint) == false - && LOOKUP_INDICES.contains(endpoint) == false) { + && LOOKUP_ENDPOINTS.contains(endpoint) == false) { return bulkClient.performRequest(request); } else { Request[] clones = cloneRequests(request, 2); @@ -277,16 +290,20 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas String query = testCase.query; String[] commands = query.split("\\|"); String first = commands[0].trim(); + // If true, we're using *:index, otherwise we're using *:index,index + boolean onlyRemotes = canUseRemoteIndicesOnly() && randomBoolean(); if (commands[0].toLowerCase(Locale.ROOT).startsWith("from")) { String[] parts = commands[0].split("(?i)metadata"); assert parts.length >= 1 : parts; String fromStatement = parts[0]; String[] localIndices = fromStatement.substring("FROM ".length()).split(","); + if (Arrays.stream(localIndices).anyMatch(i -> LOOKUP_INDICES.contains(i.trim().toLowerCase(Locale.ROOT)))) { + // If the query contains lookup indices, use only remotes to avoid duplication + onlyRemotes = true; + } final String remoteIndices; - if (canUseRemoteIndicesOnly() && randomBoolean()) { - remoteIndices = Arrays.stream(localIndices) - .map(index -> unquoteAndRequoteAsRemote(index.trim(), true)) - .collect(Collectors.joining(",")); + if (onlyRemotes) { + remoteIndices = Arrays.stream(localIndices).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); } else { remoteIndices = Arrays.stream(localIndices) .map(index -> unquoteAndRequoteAsRemote(index.trim(), false)) @@ -299,10 +316,8 @@ static CsvSpecReader.CsvTestCase convertToRemoteIndices(CsvSpecReader.CsvTestCas String[] parts = commands[0].split("\\s+"); assert parts.length >= 2 : commands[0]; String[] indices = parts[1].split(","); - if (canUseRemoteIndicesOnly() && randomBoolean()) { - parts[1] = Arrays.stream(indices) - .map(index -> unquoteAndRequoteAsRemote(index.trim(), true)) - .collect(Collectors.joining(",")); + if (onlyRemotes) { + parts[1] = Arrays.stream(indices).map(index -> "*:" + index.trim()).collect(Collectors.joining(",")); } else { parts[1] = Arrays.stream(indices) .map(index -> unquoteAndRequoteAsRemote(index.trim(), false)) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 6254b42e176fa..c88fc121870cd 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1689,7 +1689,7 @@ salary_change.long:double|foo:long joinMaskingEval required_capability: join_lookup_v12 required_capability: fix_join_masking_eval -from languag*, -languages_mixed_numerics +from languages,languages_lookup,languages_lookup_non_unique_key,languages_nested_fields | eval type = null | rename language_name as message | lookup join message_types_lookup on message From 4c29dc7781fbf44fb7f3eef2bc6c0e205707f171 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 10 Jun 2025 18:10:39 +0000 Subject: [PATCH 10/71] [CI] Auto commit changes from spotless --- .../org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 747b748424b6d..5b98c6834974f 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -42,7 +42,6 @@ import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; From c466265fbf5c550fb5e01eb55125fc1aa7310c30 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 11 Jun 2025 14:50:54 -0600 Subject: [PATCH 11/71] more tests --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 4 +- .../action/AbstractCrossClusterTestCase.java | 2 +- .../esql/action/CrossClusterLookupJoinIT.java | 107 ++++++++++++++++++ .../xpack/esql/planner/mapper/Mapper.java | 36 ++++++ .../xpack/esql/session/EsqlSession.java | 2 +- 5 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 5b98c6834974f..fcb2337f49196 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -115,7 +115,9 @@ public MultiClusterSpecIT( "StatsAndLookupIPAndMessageFromIndex", "JoinMaskingRegex", "StatsAndLookupIPFromIndex", - "StatsAndLookupMessageFromIndex" + "StatsAndLookupMessageFromIndex", + "MvJoinKeyOnTheLookupIndexAfterStats", + "MvJoinKeyOnFromAfterStats" ); @Override diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index 992572fc3220d..82b08d364839d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -255,7 +255,7 @@ protected void populateRuntimeIndex(String clusterAlias, String langName, String bulk.get(); } - protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException { + protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { Client remoteClient = client(clusterAlias); assertAcked( remoteClient.admin() diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java new file mode 100644 index 0000000000000..caaa63fa2cfb5 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; + +@TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning") +public class CrossClusterLookupJoinIT extends AbstractCrossClusterTestCase { + + public void testLookupJoinAcrossClusters() throws IOException { + setupClustersAndLookups(); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + int vIndex = columns.indexOf("v"); + int lookupNameIndex = columns.indexOf("lookup_name"); + int tagIndex = columns.indexOf("tag"); + int lookupTagIndex = columns.indexOf("lookup_tag"); + + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + for (var row : values) { + assertThat(row, hasSize(7)); + Long v = (Long) row.get(vIndex); + assertThat(v, greaterThanOrEqualTo(0L)); + if (v < 25) { + assertThat((String) row.get(lookupNameIndex), equalTo("lookup_" + v)); + String tag = (String) row.get(tagIndex); + if (tag.equals("local")) { + assertThat(row.get(lookupTagIndex), equalTo("local")); + } else { + assertThat(row.get(lookupTagIndex), equalTo(REMOTE_CLUSTER_1)); + } + } else { + assertNull(row.get(lookupNameIndex)); + assertNull(row.get(lookupTagIndex)); + } + } + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + } + } + + protected Map setupClustersAndLookups() throws IOException { + var setupData = setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 25); + return setupData; + } + + protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs) { + Client client = client(clusterAlias); + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.mode", "lookup")) + .setMapping("lookup_key", "type=long", "lookup_name", "type=keyword", "lookup_tag", "type=keyword") + ); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag).get(); + } + client.admin().indices().prepareRefresh(indexName).get(); + } + + private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { + assertNotNull(executionInfo); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + List clusters = executionInfo.clusterAliases().stream().map(executionInfo::getCluster).toList(); + + for (EsqlExecutionInfo.Cluster cluster : clusters) { + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + } + } + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 137a2118b0d54..6d44dec5e266f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -200,6 +200,37 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { return MapperUtils.mapUnary(unary, mappedChild); } + private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child) { + Holder hasFragment = new Holder<>(false); + + var childTransformed = child.transformUp(f -> { + // Once we reached FragmentExec, we stuff our Enrich under it + if (f instanceof FragmentExec) { + hasFragment.set(true); + return new FragmentExec(logical); + } + if (f instanceof EnrichExec enrichExec) { + // It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec + assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here"; + return enrichExec.child(); + } + if (f instanceof UnaryExec unaryExec) { + if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) { + return f; + } else { + return unaryExec.child(); + } + } + // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it. + return f; + }); + + if (hasFragment.get()) { + return childTransformed; + } + return null; + } + private PhysicalPlan mapBinary(BinaryPlan bp) { if (bp instanceof Join join) { JoinConfig config = join.config(); @@ -218,6 +249,11 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { return new FragmentExec(bp); } + var leftPlan = mapToFragmentExec(bp, left); + if (leftPlan != null) { + return leftPlan; + } + PhysicalPlan right = map(bp.right()); // if the right is data we can use a hash join directly if (right instanceof LocalSourceExec localData) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 365c9b666820b..2275ab83b4f87 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -504,7 +504,7 @@ private PreAnalysisResult receiveLookupIndexResolution( skipClusterOrError( clusterAlias, executionInfo, - "Lookup index [" + index + "] is not available in cluster " + EsqlCCSUtils.inClusterName(clusterAlias) + "lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias) ); } }); From b056662af86895a0922a7b1263b76daf40812e41 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 13 Jun 2025 14:36:32 -0600 Subject: [PATCH 12/71] some refactoring and debugging facilities --- .../esql/optimizer/PhysicalVerifier.java | 18 +++++++++ .../esql/plan/logical/join/LookupJoin.java | 39 +++++++++++++++++++ .../esql/plan/physical/LookupJoinExec.java | 1 + .../xpack/esql/planner/Layout.java | 6 ++- .../xpack/esql/planner/mapper/Mapper.java | 12 ++++-- 5 files changed, 72 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java index 629361a40530c..ed0f073a4e74b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java @@ -13,8 +13,11 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker; import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import static org.elasticsearch.xpack.esql.common.Failure.fail; @@ -36,6 +39,21 @@ public Failures verify(PhysicalPlan plan) { if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) { return failures; } + // Do the same for remote lookup joins + // TODO: figure out why enrich does not need two sides? + var joins = plan.collectFirstChildren(LookupJoinExec.class::isInstance); + if (joins.isEmpty() == false) { + return failures; + } + var fragment = plan.collectFirstChildren(FragmentExec.class::isInstance); + if (fragment.isEmpty() == false) { + // LookupJoin gets rewritten as Join by surrogate() + FragmentExec f = (FragmentExec) fragment.get(0); + var ljoins = f.fragment().collectFirstChildren(Join.class::isInstance); + if (ljoins.isEmpty() == false) { + return failures; + } + } plan.forEachDown(p -> { if (p instanceof FieldExtractExec fieldExtractExec) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java index 8672da7bce786..1244b053511ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java @@ -14,9 +14,12 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType; import java.util.List; @@ -30,6 +33,8 @@ */ public class LookupJoin extends Join implements SurrogateLogicalPlan, PostAnalysisVerificationAware, TelemetryAware { + private boolean isRemote = false; + public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, List joinFields) { this(source, left, right, new UsingJoinType(LEFT, joinFields), emptyList(), emptyList(), emptyList()); } @@ -86,6 +91,10 @@ public String telemetryLabel() { @Override public void postAnalysisVerification(Failures failures) { super.postAnalysisVerification(failures); + if (isRemote) { + checkRemoteJoin(this, failures); + } + // TODO: this is probably not necessary anymore as we check it in analysis stage? right().forEachDown(EsRelation.class, esr -> { var indexNameWithModes = esr.indexNameWithModes(); if (indexNameWithModes.size() != 1) { @@ -113,4 +122,34 @@ public void postAnalysisVerification(Failures failures) { } }); } + + private static void checkRemoteJoin(LogicalPlan plan, Failures failures) { + boolean[] agg = { false }; + boolean[] enrichCoord = { false }; + + plan.forEachUp(UnaryPlan.class, u -> { + if (u instanceof Aggregate) { + agg[0] = true; + } else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) { + enrichCoord[0] = true; + } + if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { + if (agg[0]) { + failures.add(fail(enrich, "LOOKUP JOIN with remote indices can't be executed after STATS")); + } + if (enrichCoord[0]) { + failures.add(fail(enrich, "LOOKUP JOIN with remote indices can't be executed after ENRICH with coordinator policy")); + } + } + }); + } + + public boolean isRemote() { + return isRemote; + } + + public LookupJoin setRemote(boolean remote) { + isRemote = remote; + return this; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java index 2aff38993aa98..86b0f5d3cf884 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java @@ -163,4 +163,5 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields); } + } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java index cf652bffa9d0b..f5ba2a9685730 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java @@ -115,7 +115,11 @@ public Layout build() { for (NameId id : set.nameIds) { // Duplicate name ids would mean that have 2 channels that are declared under the same id. That makes no sense - which // channel should subsequent operators use, then, when they want to refer to this id? - assert (layout.containsKey(id) == false) : "Duplicate name ids are not allowed in layouts"; + // FIXME: temporry change for debugging + if (layout.containsKey(id)) { + throw new IllegalArgumentException("Duplicate name ids are not allowed in layouts: " + id); + } + assert (layout.containsKey(id) == false) : "Duplicate name ids are not allowed in layouts: " + id; ChannelAndType next = new ChannelAndType(channel, set.type); layout.put(id, next); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 6d44dec5e266f..ba67a3d48fd79 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.planner.mapper; +import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; @@ -231,6 +232,9 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child) return null; } + // This is a temporary hack for debugging purposes to quick switching + private static final FeatureFlag FRAGMENT_EXEC_HACK = new FeatureFlag("fragment_exec_hack"); + private PhysicalPlan mapBinary(BinaryPlan bp) { if (bp instanceof Join join) { JoinConfig config = join.config(); @@ -249,9 +253,11 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { return new FragmentExec(bp); } - var leftPlan = mapToFragmentExec(bp, left); - if (leftPlan != null) { - return leftPlan; + if (FRAGMENT_EXEC_HACK.isEnabled()) { + var leftPlan = mapToFragmentExec(bp, left); + if (leftPlan != null) { + return leftPlan; + } } PhysicalPlan right = map(bp.right()); From 82fdd35bc9774c2b37f9da3cc9b3653b2181cca7 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 16 Jun 2025 13:26:52 -0600 Subject: [PATCH 13/71] more testing --- .../esql/action/CrossClusterLookupJoinIT.java | 172 +++++++++++++++++- .../xpack/esql/planner/mapper/Mapper.java | 4 +- .../xpack/esql/session/EsqlSession.java | 27 ++- 3 files changed, 183 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java index caaa63fa2cfb5..71118ba050b61 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.parser.ParsingException; import java.io.IOException; import java.util.List; @@ -18,10 +20,13 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; @TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning") public class CrossClusterLookupJoinIT extends AbstractCrossClusterTestCase { @@ -45,7 +50,7 @@ public void testLookupJoinAcrossClusters() throws IOException { List> values = getValuesList(resp); assertThat(values, hasSize(20)); for (var row : values) { - assertThat(row, hasSize(7)); + assertThat(row, hasSize(9)); Long v = (Long) row.get(vIndex); assertThat(v, greaterThanOrEqualTo(0L)); if (v < 25) { @@ -67,6 +72,155 @@ public void testLookupJoinAcrossClusters() throws IOException { } } + public void testLookupJoinMissingRemoteIndex() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + // Without local + try ( + EsqlQueryResponse resp = runQuery( + "FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(1)); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + // then missing index is an error + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + public void testLookupJoinMissingLocalIndex() throws IOException { + setupClusters(2); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10); + + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in local cluster")); + } + + public void testLookupJoinMissingKey() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(2)); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // FIXME: verify whether we need to skip or succeed here + // assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + // assertThat(remoteCluster.getFailures(), not(empty())); + // var failure = remoteCluster.getFailures().get(0); + // assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + // TODO: verify whether this should be an error or not when the key field is missing + Exception ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM c*:logs-* | LOOKUP JOIN values_lookup ON v", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("Unknown column [v] in right side of join")); + + ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("Unknown column [local_tag] in right side of join")); + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + } + + public void testLookupJoinIndexMode() throws IOException { + setupClusters(2); + populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); + populateIndex(REMOTE_CLUSTER_1, "values_lookup", randomIntBetween(1, 3), 10); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag")); + + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getFailures(), not(empty())); + var failure = remoteCluster.getFailures().get(0); + assertThat( + failure.reason(), + containsString("invalid [cluster-a:values_lookup] resolution in lookup mode to an index in [standard] mode") + ); + } + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + // then missing index is an error + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat( + ex.getMessage(), + containsString("invalid [cluster-a:values_lookup] resolution in lookup mode to an index in [standard] mode") + ); + } + protected Map setupClustersAndLookups() throws IOException { var setupData = setupClusters(2); populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10); @@ -76,16 +230,26 @@ protected Map setupClustersAndLookups() throws IOException { protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs) { Client client = client(clusterAlias); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + String field_tag = Strings.isEmpty(clusterAlias) ? "local_tag" : "remote_tag"; assertAcked( client.admin() .indices() .prepareCreate(indexName) .setSettings(Settings.builder().put("index.mode", "lookup")) - .setMapping("lookup_key", "type=long", "lookup_name", "type=keyword", "lookup_tag", "type=keyword") + .setMapping( + "lookup_key", + "type=long", + "lookup_name", + "type=keyword", + "lookup_tag", + "type=keyword", + field_tag, + "type=keyword" + ) ); - String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; for (int i = 0; i < numDocs; i++) { - client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag).get(); + client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag, field_tag, i).get(); } client.admin().indices().prepareRefresh(indexName).get(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index ba67a3d48fd79..2a836874ac24c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -233,7 +233,7 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child) } // This is a temporary hack for debugging purposes to quick switching - private static final FeatureFlag FRAGMENT_EXEC_HACK = new FeatureFlag("fragment_exec_hack"); + private static final boolean FRAGMENT_EXEC_HACK_ENABLED = System.getProperty("esql.fragment_exec.hack", "true").equals("true"); private PhysicalPlan mapBinary(BinaryPlan bp) { if (bp instanceof Join join) { @@ -253,7 +253,7 @@ private PhysicalPlan mapBinary(BinaryPlan bp) { return new FragmentExec(bp); } - if (FRAGMENT_EXEC_HACK.isEnabled()) { + if (FRAGMENT_EXEC_HACK_ENABLED) { var leftPlan = mapToFragmentExec(bp, left); if (leftPlan != null) { return leftPlan; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 2275ab83b4f87..9fe20c556f8b4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -426,23 +426,22 @@ private void preAnalyzeLookupIndex( assert RemoteClusterAware.isRemoteIndexName(localPattern) == false : "Lookup index name should not include remote, but got: " + localPattern; Set fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames; - StringBuilder patternWithRemotes = new StringBuilder(localPattern); - if (executionInfo.getClusters().isEmpty() == false) { - // Get the list of active clusters for the lookup index - Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); - // Create a pattern with all active remote clusters - clusters.forEach(cluster -> { - String clusterAlias = cluster.getClusterAlias(); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - // Skip the local cluster, as it is already included in the localPattern - return; - } - patternWithRemotes.append(",").append(clusterAlias).append(":").append(localPattern); - }); + + String patternWithRemotes; + + if (executionInfo.getClusters().isEmpty()) { + patternWithRemotes = localPattern; + } else { + patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING) + .map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern)) + .collect(Collectors.joining(", ")); + } + if (patternWithRemotes.isEmpty()) { + return; } // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( - patternWithRemotes.toString(), + patternWithRemotes, fieldNames, null, listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)) From abcc586d98a00cfc8d5a320b942b62795147a5ca Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 17 Jun 2025 17:43:32 +0000 Subject: [PATCH 14/71] [CI] Auto commit changes from spotless --- .../xpack/esql/action/CrossClusterLookupJoinIT.java | 1 - .../org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java | 1 - .../java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java | 1 - 3 files changed, 3 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java index 71118ba050b61..9f88b810f8a7b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.esql.VerificationException; -import org.elasticsearch.xpack.esql.parser.ParsingException; import java.io.IOException; import java.util.List; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index 1c6a8e8155abd..a8ea1ba95dc15 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -70,7 +70,6 @@ import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; -import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index 2a836874ac24c..ea9a66684cd2f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.planner.mapper; -import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; From ab5c4b75bd959da4c76dbe83e2dbd91627e0b556 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 17 Jun 2025 17:35:30 -0600 Subject: [PATCH 15/71] More tests & fixes --- .../esql/action/CrossClusterLookupJoinIT.java | 110 +++++++++++++++--- .../esql/plan/logical/join/LookupJoin.java | 52 ++++----- .../esql/planner/LocalExecutionPlanner.java | 21 +++- .../xpack/esql/session/EsqlSession.java | 19 +-- 4 files changed, 135 insertions(+), 67 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java index 9f88b810f8a7b..eb85f0c1cf0ea 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java @@ -69,6 +69,21 @@ public void testLookupJoinAcrossClusters() throws IOException { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertCCSExecutionInfoDetails(executionInfo); } + + // populateLookupIndex(LOCAL_CLUSTER, "values_lookup2", 5); + // populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup2", 5); + // FIXME: this currently does not work + // try ( + // EsqlQueryResponse resp = runQuery( + // "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key | LOOKUP JOIN values_lookup2 ON + // lookup_tag", + // randomBoolean() + // ) + // ) { + // List> values = getValuesList(resp); + // assertThat(values, hasSize(20)); + // + // } } public void testLookupJoinMissingRemoteIndex() throws IOException { @@ -98,31 +113,50 @@ public void testLookupJoinMissingRemoteIndex() throws IOException { assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); } // Without local + // FIXME: this is inconsistent due to how field-caps works - if there's no index at all, it fails, but if there's one but not + // another, it succeeds. Ideally, this would be empty result with remote1 skipped, but field-caps fails. + var ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("Unknown index [cluster-a:values_lookup]")); + + setSkipUnavailable(REMOTE_CLUSTER_1, false); + // then missing index is an error + ex = expectThrows( + VerificationException.class, + () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) + ); + assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + } + + public void testLookupJoinMissingRemoteIndexTwoRemotes() throws IOException { + setupClusters(3); + populateLookupIndex(REMOTE_CLUSTER_2, "values_lookup", 10); + + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, false); + + // FIXME: inconsistent with the previous test, remote1:values_lookup still missing, but now it succeeds with remote1 skipped try ( EsqlQueryResponse resp = runQuery( - "FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + "FROM *:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean() ) ) { List> values = getValuesList(resp); - assertThat(values, hasSize(0)); + assertThat(values, hasSize(10)); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - assertThat(executionInfo.getClusters().size(), equalTo(1)); + assertThat(executionInfo.getClusters().size(), equalTo(2)); - var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remoteCluster.getFailures(), not(empty())); - var failure = remoteCluster.getFailures().get(0); + var remoteCluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster1.getFailures(), not(empty())); + var failure = remoteCluster1.getFailures().get(0); assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + var remoteCluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); } - - setSkipUnavailable(REMOTE_CLUSTER_1, false); - // then missing index is an error - var ex = expectThrows( - VerificationException.class, - () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) - ); - assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); } public void testLookupJoinMissingLocalIndex() throws IOException { @@ -134,6 +168,26 @@ public void testLookupJoinMissingLocalIndex() throws IOException { () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) ); assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in local cluster")); + + // Without local in the query it's ok + try ( + EsqlQueryResponse resp = runQuery( + "FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(10)); + var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList(); + assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag", "remote_tag")); + assertThat(columns, not(hasItems("local_tag"))); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(1)); + + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } } public void testLookupJoinMissingKey() throws IOException { @@ -143,6 +197,7 @@ public void testLookupJoinMissingKey() throws IOException { setSkipUnavailable(REMOTE_CLUSTER_1, true); try ( + // Using local_tag as key which is not present in remote index EsqlQueryResponse resp = runQuery( "FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", randomBoolean() @@ -153,12 +208,11 @@ public void testLookupJoinMissingKey() throws IOException { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertThat(executionInfo.getClusters().size(), equalTo(2)); + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); // FIXME: verify whether we need to skip or succeed here - // assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - // assertThat(remoteCluster.getFailures(), not(empty())); - // var failure = remoteCluster.getFailures().get(0); - // assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); } // TODO: verify whether this should be an error or not when the key field is missing @@ -175,6 +229,24 @@ public void testLookupJoinMissingKey() throws IOException { assertThat(ex.getMessage(), containsString("Unknown column [local_tag] in right side of join")); setSkipUnavailable(REMOTE_CLUSTER_1, false); + try ( + // Using local_tag as key which is not present in remote index + EsqlQueryResponse resp = runQuery( + "FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag", + randomBoolean() + ) + ) { + List> values = getValuesList(resp); + assertThat(values, hasSize(20)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.getClusters().size(), equalTo(2)); + + var localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // FIXME: verify whether we need to succeed or fail here + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } } public void testLookupJoinIndexMode() throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java index 1244b053511ca..0f497b67452a9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java @@ -95,32 +95,32 @@ public void postAnalysisVerification(Failures failures) { checkRemoteJoin(this, failures); } // TODO: this is probably not necessary anymore as we check it in analysis stage? - right().forEachDown(EsRelation.class, esr -> { - var indexNameWithModes = esr.indexNameWithModes(); - if (indexNameWithModes.size() != 1) { - failures.add( - fail( - esr, - "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] indices", - esr.indexPattern(), - indexNameWithModes.size() - ) - ); - return; - } - var indexAndMode = indexNameWithModes.entrySet().iterator().next(); - if (indexAndMode.getValue() != IndexMode.LOOKUP) { - failures.add( - fail( - esr, - "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] in [{}] mode", - esr.indexPattern(), - indexAndMode.getKey(), - indexAndMode.getValue() - ) - ); - } - }); +// right().forEachDown(EsRelation.class, esr -> { +// var indexNameWithModes = esr.indexNameWithModes(); +// if (indexNameWithModes.size() != 1) { +// failures.add( +// fail( +// esr, +// "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] indices", +// esr.indexPattern(), +// indexNameWithModes.size() +// ) +// ); +// return; +// } +// var indexAndMode = indexNameWithModes.entrySet().iterator().next(); +// if (indexAndMode.getValue() != IndexMode.LOOKUP) { +// failures.add( +// fail( +// esr, +// "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] in [{}] mode", +// esr.indexPattern(), +// indexAndMode.getKey(), +// indexAndMode.getValue() +// ) +// ); +// } +// }); } private static void checkRemoteJoin(LogicalPlan plan, Failures failures) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index a92d2f439a0ea..451f924985d75 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -61,6 +61,7 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; import org.elasticsearch.xpack.esql.core.expression.Alias; @@ -123,6 +124,7 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.score.ScoreMapper; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import java.util.ArrayList; import java.util.List; @@ -733,15 +735,24 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan if (localSourceExec.indexMode() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "]"); } - Map indicesWithModes = localSourceExec.indexNameWithModes(); - if (indicesWithModes.size() != 1) { - throw new IllegalArgumentException("can't plan [" + join + "], found more than 1 index"); + + var maybeEntry = localSourceExec.indexNameWithModes() + .entrySet() + .stream() + .filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias)) + .findFirst(); + + if (maybeEntry.isEmpty()) { + throw new IllegalArgumentException( + "can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias) + ); } - var entry = indicesWithModes.entrySet().iterator().next(); + var entry = maybeEntry.get(); + if (entry.getValue() != IndexMode.LOOKUP) { throw new IllegalArgumentException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]"); } - String indexName = entry.getKey(); + String indexName = RemoteClusterAware.splitIndexName(entry.getKey())[1]; if (join.leftFields().size() != join.rightFields().size()) { throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count"); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 9fe20c556f8b4..5162087aedc85 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -434,7 +434,7 @@ private void preAnalyzeLookupIndex( } else { patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING) .map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern)) - .collect(Collectors.joining(", ")); + .collect(Collectors.joining(",")); } if (patternWithRemotes.isEmpty()) { return; @@ -508,23 +508,8 @@ private PreAnalysisResult receiveLookupIndexResolution( } }); - if (clustersWithResolvedIndices.size() > 1) { - // If we have multiple resolutions for the lookup index, we need to only leave the local resolution - String localIndexName = clustersWithResolvedIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - if (localIndexName == null) { - // Get the first index name instead - localIndexName = RemoteClusterAware.splitIndexName(clustersWithResolvedIndices.values().iterator().next())[1]; - } - var localIndex = new EsIndex(index, newIndexResolution.get().mapping(), Map.of(localIndexName, IndexMode.LOOKUP)); - newIndexResolution = IndexResolution.valid( - localIndex, - localIndex.concreteIndices(), - newIndexResolution.getUnavailableShards(), - newIndexResolution.unavailableClusters() - ); - } - return result.addLookupIndexResolution(index, newIndexResolution); + } private void initializeClusterData(List indices, EsqlExecutionInfo executionInfo) { From 5eddf62914bbb2b1c3625165981b3f3e57c72c3a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 17 Jun 2025 23:35:45 -0600 Subject: [PATCH 16/71] more fixes --- .../Debug_Elasticsearch__node_2_.xml | 4 +++ .../Debug_Elasticsearch__node_3_.xml | 4 +++ .../xpack/esql/ccq/MultiClusterSpecIT.java | 8 +++-- .../esql/plan/logical/join/LookupJoin.java | 35 ++----------------- .../esql/planner/LocalExecutionPlanner.java | 31 ++++++++++------ .../xpack/esql/planner/mapper/Mapper.java | 11 +++++- .../xpack/esql/session/EsqlSession.java | 24 +++++++++++-- 7 files changed, 69 insertions(+), 48 deletions(-) diff --git a/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml b/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml index 94bb079398ffd..76bb8f72eafc9 100644 --- a/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml +++ b/.idea/runConfigurations/Debug_Elasticsearch__node_2_.xml @@ -6,6 +6,10 @@