|
7 | 7 |
|
8 | 8 | package org.elasticsearch.xpack.esql.plugin; |
9 | 9 |
|
| 10 | +import org.elasticsearch.ElasticsearchException; |
10 | 11 | import org.elasticsearch.action.ActionListener; |
11 | 12 | import org.elasticsearch.action.ActionListenerResponseHandler; |
12 | 13 | import org.elasticsearch.action.OriginalIndices; |
@@ -198,21 +199,25 @@ record RemoteCluster(String clusterAlias, Transport.Connection connection, Strin |
198 | 199 | public void messageReceived(ClusterComputeRequest request, TransportChannel channel, Task task) { |
199 | 200 | ChannelActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel); |
200 | 201 | RemoteClusterPlan remoteClusterPlan = request.remoteClusterPlan(); |
201 | | - var plan = remoteClusterPlan.plan(); |
202 | | - if (plan instanceof ExchangeSinkExec == false) { |
203 | | - listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan)); |
204 | | - return; |
| 202 | + try { |
| 203 | + var plan = remoteClusterPlan.plan(); |
| 204 | + if (plan instanceof ExchangeSinkExec == false) { |
| 205 | + listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan)); |
| 206 | + return; |
| 207 | + } |
| 208 | + runComputeOnRemoteCluster( |
| 209 | + request.clusterAlias(), |
| 210 | + request.sessionId(), |
| 211 | + (CancellableTask) task, |
| 212 | + request.configuration(), |
| 213 | + (ExchangeSinkExec) plan, |
| 214 | + Set.of(remoteClusterPlan.targetIndices()), |
| 215 | + remoteClusterPlan.originalIndices(), |
| 216 | + listener |
| 217 | + ); |
| 218 | + } catch (Exception e) { |
| 219 | + listener.onFailure(new ElasticsearchException(request.clusterAlias() + " encountered an error", e)); |
205 | 220 | } |
206 | | - runComputeOnRemoteCluster( |
207 | | - request.clusterAlias(), |
208 | | - request.sessionId(), |
209 | | - (CancellableTask) task, |
210 | | - request.configuration(), |
211 | | - (ExchangeSinkExec) plan, |
212 | | - Set.of(remoteClusterPlan.targetIndices()), |
213 | | - remoteClusterPlan.originalIndices(), |
214 | | - listener |
215 | | - ); |
216 | 221 | } |
217 | 222 |
|
218 | 223 | /** |
|
0 commit comments