From c5edb1a02dace406c4e816c1ba942bd18e181066 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 13 Aug 2025 13:16:28 -0600 Subject: [PATCH] Better failure handling for lookup join --- .../CrossClusterLookupJoinFailuresIT.java | 18 +++++++------- .../xpack/esql/VerificationException.java | 4 ++++ .../xpack/esql/session/EsqlSession.java | 24 ++++++++++++++----- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java index 2643926bf1637..c859712dcf8dd 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java @@ -94,6 +94,7 @@ public void sendResponse(Exception exception) { } */ try ( + // This only calls REMOTE_CLUSTER_1 which is skip_unavailable=true EsqlQueryResponse resp = runQuery( "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean() @@ -112,9 +113,8 @@ public void sendResponse(Exception exception) { assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remoteCluster.getFailures(), not(empty())); var failure = remoteCluster.getFailures().get(0); - // FIXME: this produces a wrong message currently - // assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); - assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); + assertThat(failure.reason(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]")); } try ( @@ -138,24 +138,26 @@ public void sendResponse(Exception exception) { assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remoteCluster.getFailures(), not(empty())); var failure = remoteCluster.getFailures().get(0); - // FIXME: this produces a wrong message currently - // assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); - assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + assertThat(failure.reason(), containsString(simulatedFailure.getMessage())); + assertThat(failure.reason(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]")); } // now fail setSkipUnavailable(REMOTE_CLUSTER_1, false); + // c*: only calls REMOTE_CLUSTER_1 which is skip_unavailable=false now Exception ex = expectThrows( VerificationException.class, () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) ); - assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]")); + assertThat(ex.getMessage(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]")); + String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage(); + assertThat(message, containsString(simulatedFailure.getMessage())); ex = expectThrows( Exception.class, () -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean()) ); - String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage(); + message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage(); assertThat(message, containsString(simulatedFailure.getMessage())); } finally { for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/VerificationException.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/VerificationException.java index 8443b8d99d04a..54583c8a75039 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/VerificationException.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/VerificationException.java @@ -25,4 +25,8 @@ public VerificationException(Failures failures) { super(failures.toString()); } + public VerificationException(String message, Throwable cause) { + super(message, cause); + } + } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a98b0f3c52735..2e2fc2403a8ec 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -473,7 +473,10 @@ private void preAnalyzeLookupIndex( } private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) { - VerificationException error = new VerificationException(message); + skipClusterOrError(clusterAlias, executionInfo, new VerificationException(message)); + } + + private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, ElasticsearchException error) { // If we can, skip the cluster and mark it as such if (executionInfo.isSkipUnavailable(clusterAlias)) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error); @@ -573,11 +576,7 @@ private PreAnalysisResult receiveLookupIndexResolution( String clusterAlias = cluster.getClusterAlias(); if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) { // Missing cluster resolution - skipClusterOrError( - clusterAlias, - executionInfo, - "lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias) - ); + skipClusterOrError(clusterAlias, executionInfo, findFailure(lookupIndexResolution.failures(), index, clusterAlias)); } }); @@ -587,6 +586,19 @@ private PreAnalysisResult receiveLookupIndexResolution( ); } + private ElasticsearchException findFailure(Map> failures, String index, String clusterAlias) { + if (failures.containsKey(clusterAlias)) { + var exc = failures.get(clusterAlias).stream().findFirst().map(FieldCapabilitiesFailure::getException); + if (exc.isPresent()) { + return new VerificationException( + "lookup failed " + EsqlCCSUtils.inClusterName(clusterAlias) + " for index [" + index + "]", + ExceptionsHelper.unwrapCause(exc.get()) + ); + } + } + return new VerificationException("lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias)); + } + /** * Check whether the lookup index resolves to a single concrete index on all clusters or not. * If it's a single index, we are compatible with old pre-9.2 LOOKUP JOIN code and just need to send the same resolution as we did.