Skip to content

Commit 2ff4e19

Browse files
committed
Cover more cases for skip
1 parent 95a02b8 commit 2ff4e19

File tree

2 files changed

+39
-17
lines changed

2 files changed

+39
-17
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -189,23 +189,33 @@ ActionListener<Void> acquireAvoid() {
189189
});
190190
}
191191

192+
boolean shouldIgnoreRemoteErrors(@Nullable String computeClusterAlias) {
193+
return computeClusterAlias != null
194+
&& esqlExecutionInfo.isCrossClusterSearch()
195+
&& computeClusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
196+
&& runningOnRemoteCluster() == false
197+
&& esqlExecutionInfo.isSkipUnavailable(computeClusterAlias);
198+
}
199+
200+
void markAsPartial(String computeClusterAlias, Exception e) {
201+
// We use PARTIAL here because we can not know whether the cluster have already sent any data.
202+
esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> {
203+
assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED
204+
: "We shouldn't be running compute on a cluster that's already marked as skipped";
205+
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL)
206+
.setFailures(List.of(new ShardSearchFailure(e)))
207+
.build();
208+
});
209+
}
210+
192211
ActionListener<Void> acquireSkipUnavailable(@Nullable String computeClusterAlias) {
193-
if (computeClusterAlias == null
194-
|| esqlExecutionInfo.isCrossClusterSearch() == false
195-
|| runningOnRemoteCluster()
196-
|| computeClusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
197-
|| esqlExecutionInfo.isSkipUnavailable(computeClusterAlias) == false) {
212+
if (shouldIgnoreRemoteErrors(computeClusterAlias) == false) {
198213
return acquireAvoid();
199214
}
200215
return refs.acquire().delegateResponse((l, e) -> {
201-
LOGGER.error("Skipping unavailable cluster {} in ESQL query: {}", computeClusterAlias, e);
202-
esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> {
203-
assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED
204-
: "We shouldn't be running compute on a cluster that's already marked as skipped";
205-
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL)
206-
.setFailures(List.of(new ShardSearchFailure(e)))
207-
.build();
208-
});
216+
// TODO: drop this in final patch
217+
LOGGER.error("Marking failed cluster {} as partial: {}", computeClusterAlias, e);
218+
markAsPartial(computeClusterAlias, e);
209219
l.onResponse(null);
210220
});
211221
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,20 +380,23 @@ private void startComputeOnRemoteClusters(
380380
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
381381
try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) {
382382
for (RemoteCluster cluster : clusters) {
383+
final String clusterAlias = cluster.clusterAlias();
384+
final boolean shouldSkipOnFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias);
385+
final var exchangeListener = refs.acquire();
383386
ExchangeService.openExchange(
384387
transportService,
385388
cluster.connection,
386389
sessionId,
387390
queryPragmas.exchangeBufferSize(),
388391
esqlExecutor,
389-
refs.acquire().delegateFailureAndWrap((l, unused) -> {
392+
ActionListener.wrap(unused -> {
390393
var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection);
391394
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
392395
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
393-
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan);
396+
var clusterRequest = new ClusterComputeRequest(clusterAlias, sessionId, configuration, remotePlan);
394397
var clusterListener = ActionListener.runBefore(
395-
computeListener.acquireCompute(cluster.clusterAlias()),
396-
() -> l.onResponse(null)
398+
computeListener.acquireCompute(clusterAlias),
399+
() -> exchangeListener.onResponse(null)
397400
);
398401
transportService.sendChildRequest(
399402
cluster.connection,
@@ -403,6 +406,15 @@ private void startComputeOnRemoteClusters(
403406
TransportRequestOptions.EMPTY,
404407
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
405408
);
409+
}, e -> {
410+
if (shouldSkipOnFailure) {
411+
// TODO: drop this in final patch
412+
LOGGER.error("Marking failed cluster {} as partial: {}", clusterAlias, e);
413+
computeListener.markAsPartial(clusterAlias, e);
414+
exchangeListener.onResponse(null);
415+
} else {
416+
exchangeListener.onFailure(e);
417+
}
406418
})
407419
);
408420
}

0 commit comments

Comments
 (0)