Skip to content

Commit e0296d0

Browse files
authored
Better failure handling for lookup join (#132874)
1 parent 9bde671 commit e0296d0

File tree

3 files changed

+32
-14
lines changed

3 files changed

+32
-14
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public void sendResponse(Exception exception) {
9494
} */
9595

9696
try (
97+
// This only calls REMOTE_CLUSTER_1 which is skip_unavailable=true
9798
EsqlQueryResponse resp = runQuery(
9899
"FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
99100
randomBoolean()
@@ -112,9 +113,8 @@ public void sendResponse(Exception exception) {
112113
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
113114
assertThat(remoteCluster.getFailures(), not(empty()));
114115
var failure = remoteCluster.getFailures().get(0);
115-
// FIXME: this produces a wrong message currently
116-
// assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
117-
assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
116+
assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
117+
assertThat(failure.reason(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]"));
118118
}
119119

120120
try (
@@ -138,24 +138,26 @@ public void sendResponse(Exception exception) {
138138
assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
139139
assertThat(remoteCluster.getFailures(), not(empty()));
140140
var failure = remoteCluster.getFailures().get(0);
141-
// FIXME: this produces a wrong message currently
142-
// assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
143-
assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
141+
assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
142+
assertThat(failure.reason(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]"));
144143
}
145144

146145
// now fail
147146
setSkipUnavailable(REMOTE_CLUSTER_1, false);
147+
// c*: only calls REMOTE_CLUSTER_1 which is skip_unavailable=false now
148148
Exception ex = expectThrows(
149149
VerificationException.class,
150150
() -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
151151
);
152-
assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
152+
assertThat(ex.getMessage(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]"));
153+
String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage();
154+
assertThat(message, containsString(simulatedFailure.getMessage()));
153155

154156
ex = expectThrows(
155157
Exception.class,
156158
() -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
157159
);
158-
String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage();
160+
message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage();
159161
assertThat(message, containsString(simulatedFailure.getMessage()));
160162
} finally {
161163
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/VerificationException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,8 @@ public VerificationException(Failures failures) {
2525
super(failures.toString());
2626
}
2727

28+
public VerificationException(String message, Throwable cause) {
29+
super(message, cause);
30+
}
31+
2832
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,10 @@ private void preAnalyzeLookupIndex(
428428
}
429429

430430
private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) {
431-
VerificationException error = new VerificationException(message);
431+
skipClusterOrError(clusterAlias, executionInfo, new VerificationException(message));
432+
}
433+
434+
private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, ElasticsearchException error) {
432435
// If we can, skip the cluster and mark it as such
433436
if (executionInfo.shouldSkipOnFailure(clusterAlias)) {
434437
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error);
@@ -528,11 +531,7 @@ private PreAnalysisResult receiveLookupIndexResolution(
528531
String clusterAlias = cluster.getClusterAlias();
529532
if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) {
530533
// Missing cluster resolution
531-
skipClusterOrError(
532-
clusterAlias,
533-
executionInfo,
534-
"lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias)
535-
);
534+
skipClusterOrError(clusterAlias, executionInfo, findFailure(lookupIndexResolution.failures(), index, clusterAlias));
536535
}
537536
});
538537

@@ -542,6 +541,19 @@ private PreAnalysisResult receiveLookupIndexResolution(
542541
);
543542
}
544543

544+
private ElasticsearchException findFailure(Map<String, List<FieldCapabilitiesFailure>> failures, String index, String clusterAlias) {
545+
if (failures.containsKey(clusterAlias)) {
546+
var exc = failures.get(clusterAlias).stream().findFirst().map(FieldCapabilitiesFailure::getException);
547+
if (exc.isPresent()) {
548+
return new VerificationException(
549+
"lookup failed " + EsqlCCSUtils.inClusterName(clusterAlias) + " for index [" + index + "]",
550+
ExceptionsHelper.unwrapCause(exc.get())
551+
);
552+
}
553+
}
554+
return new VerificationException("lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias));
555+
}
556+
545557
/**
546558
* Check whether the lookup index resolves to a single concrete index on all clusters or not.
547559
* If it's a single index, we are compatible with old pre-9.2 LOOKUP JOIN code and just need to send the same resolution as we did.

0 commit comments

Comments
 (0)