Skip to content

Commit cd3fefa

Browse files
Add test and handle missed subset of scenarios where an error can be
thrown
1 parent e32956a commit cd3fefa

File tree

3 files changed

+61
-18
lines changed

3 files changed

+61
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
12+
import org.elasticsearch.test.transport.MockTransportService;
13+
import org.elasticsearch.transport.TransportService;
14+
15+
import static org.hamcrest.Matchers.is;
16+
17+
public class CrossClusterRemoteErrorClusterAliasWrapIT extends AbstractCrossClusterTestCase {
18+
19+
public void testThatRemoteErrorsAreWrapped() throws Exception {
20+
setupClusters(2);
21+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
22+
setSkipUnavailable(REMOTE_CLUSTER_2, false);
23+
24+
/*
25+
* Let's say something went wrong with the Exchange and its specifics when talking to a remote.
26+
* And let's pretend only cluster-a is affected.
27+
*/
28+
for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
29+
((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
30+
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
31+
(requestHandler, transportRequest, transportChannel, transportTask) -> {
32+
throw new IllegalArgumentException("some error to wreck havoc");
33+
}
34+
);
35+
}
36+
37+
ElasticsearchException wrappedError = expectThrows(
38+
ElasticsearchException.class,
39+
() -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false)
40+
);
41+
assertThat(wrappedError.getMessage(), is("cluster-a encountered an error"));
42+
}
43+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -199,25 +199,21 @@ record RemoteCluster(String clusterAlias, Transport.Connection connection, Strin
199199
public void messageReceived(ClusterComputeRequest request, TransportChannel channel, Task task) {
200200
ChannelActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
201201
RemoteClusterPlan remoteClusterPlan = request.remoteClusterPlan();
202-
try {
203-
var plan = remoteClusterPlan.plan();
204-
if (plan instanceof ExchangeSinkExec == false) {
205-
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan));
206-
return;
207-
}
208-
runComputeOnRemoteCluster(
209-
request.clusterAlias(),
210-
request.sessionId(),
211-
(CancellableTask) task,
212-
request.configuration(),
213-
(ExchangeSinkExec) plan,
214-
Set.of(remoteClusterPlan.targetIndices()),
215-
remoteClusterPlan.originalIndices(),
216-
listener
217-
);
218-
} catch (Exception e) {
219-
listener.onFailure(new ElasticsearchException(request.clusterAlias() + " encountered an error", e));
202+
var plan = remoteClusterPlan.plan();
203+
if (plan instanceof ExchangeSinkExec == false) {
204+
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan));
205+
return;
220206
}
207+
runComputeOnRemoteCluster(
208+
request.clusterAlias(),
209+
request.sessionId(),
210+
(CancellableTask) task,
211+
request.configuration(),
212+
(ExchangeSinkExec) plan,
213+
Set.of(remoteClusterPlan.targetIndices()),
214+
remoteClusterPlan.originalIndices(),
215+
listener
216+
);
221217
}
222218

223219
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.ElasticsearchException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.OriginalIndices;
1213
import org.elasticsearch.action.search.SearchRequest;
@@ -299,6 +300,9 @@ public void execute(
299300
cancelQueryOnFailure,
300301
execInfo,
301302
computeListener.acquireCompute()
303+
.delegateResponse(
304+
(l, ex) -> l.onFailure(new ElasticsearchException(cluster.clusterAlias() + " encountered an error", ex))
305+
)
302306
);
303307
}
304308
}

0 commit comments

Comments
 (0)