Skip to content

Commit 0848dd1

Browse files
committed
Refine monitored exceptions
1 parent 49ed90e commit 0848dd1

File tree

5 files changed

+38
-33
lines changed

5 files changed

+38
-33
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
3030
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3131
import org.elasticsearch.xpack.esql.session.Configuration;
32+
import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils;
3233

3334
import java.util.ArrayList;
3435
import java.util.List;
@@ -38,9 +39,6 @@
3839
import java.util.concurrent.Executor;
3940
import java.util.concurrent.atomic.AtomicReference;
4041

41-
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards;
42-
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.shouldIgnoreRuntimeError;
43-
4442
/**
4543
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
4644
* This handler delegates the execution of computes on data nodes within each remote cluster to {@link DataNodeComputeHandler}.
@@ -132,8 +130,13 @@ void startComputeOnRemoteCluster(
132130
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
133131
);
134132
}, e -> {
135-
if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
136-
markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
133+
if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
134+
EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards(
135+
executionInfo,
136+
clusterAlias,
137+
EsqlExecutionInfo.Cluster.Status.SKIPPED,
138+
e
139+
);
137140
openExchangeListener.onResponse(null);
138141
} else {
139142
openExchangeListener.onFailure(e);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
4949
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
5050
import org.elasticsearch.xpack.esql.session.Configuration;
51+
import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils;
5152
import org.elasticsearch.xpack.esql.session.Result;
5253

5354
import java.util.ArrayList;
@@ -61,8 +62,6 @@
6162
import java.util.function.Supplier;
6263

6364
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
64-
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards;
65-
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.shouldIgnoreRuntimeError;
6665

6766
/**
6867
* Computes the result of a {@link PhysicalPlan}.
@@ -270,8 +269,6 @@ public void execute(
270269
// starts computes on remote clusters
271270
final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices);
272271
for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) {
273-
var remoteListener = computeListener.acquireCompute();
274-
String clusterAlias = cluster.clusterAlias();
275272
clusterComputeHandler.startComputeOnRemoteCluster(
276273
sessionId,
277274
rootTask,
@@ -281,16 +278,9 @@ public void execute(
281278
cluster,
282279
cancelQueryOnFailure,
283280
execInfo,
284-
ActionListener.wrap((ComputeResponse r) -> {
285-
updateExecutionInfo(execInfo, clusterAlias, r);
286-
remoteListener.onResponse(r.getProfiles());
287-
}, e -> {
288-
if (shouldIgnoreRuntimeError(execInfo, clusterAlias, e)) {
289-
markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
290-
remoteListener.onResponse(Collections.emptyList());
291-
} else {
292-
remoteListener.onFailure(e);
293-
}
281+
computeListener.acquireCompute().map(r -> {
282+
updateExecutionInfo(execInfo, cluster.clusterAlias(), r);
283+
return r.getProfiles();
294284
})
295285
);
296286
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import org.elasticsearch.transport.TransportRequest;
1818
import org.elasticsearch.transport.TransportService;
1919
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
20+
import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils;
2021

2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.function.Supplier;
2425

25-
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards;
26-
2726
// Create group task for this cluster. This group task ensures that two branches of the computation:
2827
// the exchange sink and the cluster request, belong to the same group and each of them can cancel the other.
2928
// runAfter listeners below ensure that the group is finalized when both branches are done.
@@ -73,8 +72,13 @@ class RemoteListenerGroup {
7372
private <T> ActionListener<T> createCancellingListener(String reason, ActionListener<T> delegate, Runnable finishGroup) {
7473
return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> {
7574
taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> {
76-
if (shouldIgnoreRemoteError(clusterAlias, e)) {
77-
markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
75+
if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
76+
EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards(
77+
executionInfo,
78+
clusterAlias,
79+
EsqlExecutionInfo.Cluster.Status.PARTIAL,
80+
e
81+
);
7882
delegate.onResponse(null);
7983
} else {
8084
delegate.onFailure(e);
@@ -83,10 +87,6 @@ private <T> ActionListener<T> createCancellingListener(String reason, ActionList
8387
}), finishGroup);
8488
}
8589

86-
private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) {
87-
return executionInfo.isSkipUnavailable(clusterAlias);
88-
}
89-
9090
public CancellableTask getGroupTask() {
9191
return groupTask;
9292
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.core.TimeValue;
2020
import org.elasticsearch.indices.IndicesExpressionGrouper;
2121
import org.elasticsearch.license.XPackLicenseState;
22+
import org.elasticsearch.tasks.TaskCancelledException;
2223
import org.elasticsearch.transport.ConnectTransportException;
2324
import org.elasticsearch.transport.NoSuchRemoteClusterException;
2425
import org.elasticsearch.transport.RemoteClusterAware;
@@ -37,6 +38,7 @@
3738
import java.util.HashSet;
3839
import java.util.List;
3940
import java.util.Map;
41+
import java.util.Objects;
4042
import java.util.Set;
4143

4244
public class EsqlSessionCCSUtils {
@@ -347,10 +349,10 @@ public static void markClusterWithFinalStateAndNoShards(
347349
executionInfo.swapCluster(clusterAlias, (k, v) -> {
348350
Cluster.Builder builder = new Cluster.Builder(v).setStatus(status)
349351
.setTook(executionInfo.tookSoFar())
350-
.setTotalShards(0)
351-
.setSuccessfulShards(0)
352-
.setSkippedShards(0)
353-
.setFailedShards(0);
352+
.setTotalShards(Objects.requireNonNullElse(v.getTotalShards(), 0))
353+
.setSuccessfulShards(Objects.requireNonNullElse(v.getTotalShards(), 0))
354+
.setSkippedShards(Objects.requireNonNullElse(v.getTotalShards(), 0))
355+
.setFailedShards(Objects.requireNonNullElse(v.getTotalShards(), 0));
354356
if (ex != null) {
355357
builder.setFailures(List.of(new ShardSearchFailure(ex)));
356358
}
@@ -362,6 +364,8 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo,
362364
if (executionInfo.isSkipUnavailable(clusterAlias) == false) {
363365
return false;
364366
}
365-
return ExceptionsHelper.isRemoteUnavailableException(e);
367+
368+
return ExceptionsHelper.isRemoteUnavailableException(e)
369+
|| (e instanceof RemoteTransportException && e.getCause() instanceof TaskCancelledException);
366370
}
367371
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,10 @@ public void testUpdateExecutionInfoAtEndOfPlanning() {
534534

535535
private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) {
536536
assertThat(cluster.getStatus(), equalTo(status));
537-
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
537+
if (cluster.getTook() != null) {
538+
// It is also ok if it's null in some tests
539+
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
540+
}
538541
if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) {
539542
assertNull(cluster.getTotalShards());
540543
assertNull(cluster.getSuccessfulShards());
@@ -545,6 +548,11 @@ private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster
545548
assertThat(cluster.getSuccessfulShards(), equalTo(0));
546549
assertThat(cluster.getSkippedShards(), equalTo(0));
547550
assertThat(cluster.getFailedShards(), equalTo(0));
551+
} else if (status == EsqlExecutionInfo.Cluster.Status.PARTIAL) {
552+
assertThat(cluster.getTotalShards(), equalTo(0));
553+
assertThat(cluster.getSuccessfulShards(), equalTo(0));
554+
assertThat(cluster.getSkippedShards(), equalTo(0));
555+
assertThat(cluster.getFailedShards(), equalTo(0));
548556
} else {
549557
fail("Unexpected status: " + status);
550558
}

0 commit comments

Comments
 (0)