Skip to content

Commit 8ef049f

Browse files
Ensure transport errors are unwrapped before sending response to the
user and move `unwrapIfWrappedInRemoteComputeException` to `EsqlTestUtils`
1 parent 764933f commit 8ef049f

File tree

5 files changed

+43
-35
lines changed

5 files changed

+43
-35
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.sandbox.document.HalfFloatPoint;
1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.cluster.RemoteComputeException;
1415
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.Strings;
@@ -858,4 +859,18 @@ public static <T> T singleValue(Collection<T> collection) {
858859
assertThat(collection, hasSize(1));
859860
return collection.iterator().next();
860861
}
862+
863+
/**
864+
* Errors from remotes are wrapped in RemoteComputeException while the ones from the local cluster
865+
* aren't. This utility method is useful for unwrapping in such cases.
866+
* @param e Exception to unwrap.
867+
* @return Cause of RemoteComputeException, else the error itself.
868+
*/
869+
public static Exception unwrapIfWrappedInRemoteComputeException(Exception e) {
870+
if (e instanceof RemoteComputeException rce) {
871+
return (Exception) rce.getCause();
872+
} else {
873+
return e;
874+
}
875+
}
861876
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.index.IndexRequest;
1515
import org.elasticsearch.action.support.PlainActionFuture;
1616
import org.elasticsearch.action.support.WriteRequest;
17-
import org.elasticsearch.cluster.RemoteComputeException;
1817
import org.elasticsearch.common.settings.Setting;
1918
import org.elasticsearch.common.transport.TransportAddress;
2019
import org.elasticsearch.compute.operator.DriverTaskRunner;
@@ -27,6 +26,7 @@
2726
import org.elasticsearch.transport.TransportService;
2827
import org.elasticsearch.xcontent.XContentBuilder;
2928
import org.elasticsearch.xcontent.json.JsonXContent;
29+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
3030
import org.elasticsearch.xpack.esql.plugin.ComputeService;
3131
import org.junit.After;
3232
import org.junit.Before;
@@ -89,14 +89,6 @@ protected boolean reuseClusters() {
8989
return false;
9090
}
9191

92-
private Exception unwrapIfWrappedInRemoteComputeException(Exception e) {
93-
if (e instanceof RemoteComputeException rce) {
94-
return (Exception) rce.getCause();
95-
} else {
96-
return e;
97-
}
98-
}
99-
10092
private void createRemoteIndex(int numDocs) throws Exception {
10193
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
10294
mapping.startObject("runtime");
@@ -172,7 +164,7 @@ public void testCancel() throws Exception {
172164
SimplePauseFieldPlugin.allowEmitting.countDown();
173165
}
174166
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
175-
error = unwrapIfWrappedInRemoteComputeException(error);
167+
error = EsqlTestUtils.unwrapIfWrappedInRemoteComputeException(error);
176168
assertThat(error.getMessage(), containsString("proxy timeout"));
177169
}
178170

@@ -294,7 +286,7 @@ public void testCancelSkipUnavailable() throws Exception {
294286
}
295287

296288
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
297-
error = unwrapIfWrappedInRemoteComputeException(error);
289+
error = EsqlTestUtils.unwrapIfWrappedInRemoteComputeException(error);
298290
assertThat(error, instanceOf(TaskCancelledException.class));
299291
}
300292
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1313
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.cluster.RemoteComputeException;
1615
import org.elasticsearch.cluster.metadata.IndexMetadata;
1716
import org.elasticsearch.common.Priority;
1817
import org.elasticsearch.common.Strings;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.transport.TransportService;
3130
import org.elasticsearch.xcontent.XContentBuilder;
3231
import org.elasticsearch.xcontent.json.JsonXContent;
32+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
3333
import org.elasticsearch.xpack.esql.VerificationException;
3434
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
3535

@@ -64,14 +64,6 @@ protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
6464
return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean());
6565
}
6666

67-
private Exception unwrapIfWrappedInRemoteComputeException(Exception e) {
68-
if (e instanceof RemoteComputeException rce) {
69-
return (Exception) rce.getCause();
70-
} else {
71-
return e;
72-
}
73-
}
74-
7567
public void testSuccessfulPathways() throws Exception {
7668
Map<String, Object> testClusterInfo = setupTwoClusters();
7769
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
@@ -825,7 +817,7 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException {
825817
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
826818

827819
Exception error = expectThrows(Exception.class, () -> runQuery(q, false));
828-
error = unwrapIfWrappedInRemoteComputeException(error);
820+
error = EsqlTestUtils.unwrapIfWrappedInRemoteComputeException(error);
829821

830822
assertThat(error, instanceOf(IllegalStateException.class));
831823
assertThat(error.getMessage(), containsString("Accessing failing field"));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.ResourceNotFoundException;
1212
import org.elasticsearch.action.search.ShardSearchFailure;
1313
import org.elasticsearch.client.internal.Client;
14-
import org.elasticsearch.cluster.RemoteComputeException;
1514
import org.elasticsearch.common.breaker.CircuitBreaker;
1615
import org.elasticsearch.common.breaker.CircuitBreakingException;
1716
import org.elasticsearch.common.settings.Settings;
@@ -99,14 +98,6 @@ private void assertClusterSuccess(EsqlQueryResponse resp, String clusterAlias, i
9998
assertThat(clusterInfo.getFailures(), empty());
10099
}
101100

102-
private Exception unwrapIfWrappedInRemoteComputeException(Exception e) {
103-
if (e instanceof RemoteComputeException rce) {
104-
return (Exception) rce.getCause();
105-
} else {
106-
return e;
107-
}
108-
}
109-
110101
public void testPartialResults() throws Exception {
111102
populateIndices();
112103
EsqlQueryRequest request = new EsqlQueryRequest();
@@ -115,7 +106,7 @@ public void testPartialResults() throws Exception {
115106
{
116107
request.allowPartialResults(false);
117108
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
118-
error = unwrapIfWrappedInRemoteComputeException(error);
109+
error = EsqlTestUtils.unwrapIfWrappedInRemoteComputeException(error);
119110

120111
assertThat(error, instanceOf(IllegalStateException.class));
121112
assertThat(error.getMessage(), containsString("Accessing failing field"));
@@ -203,7 +194,7 @@ public void sendResponse(Exception exception) {
203194
{
204195
request.allowPartialResults(false);
205196
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
206-
error = unwrapIfWrappedInRemoteComputeException(error);
197+
error = EsqlTestUtils.unwrapIfWrappedInRemoteComputeException(error);
207198
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
208199
assertNotNull(unwrapped);
209200
assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage()));
@@ -251,7 +242,7 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
251242
{
252243
request.allowPartialResults(false);
253244
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
254-
error = unwrapIfWrappedInRemoteComputeException(error);
245+
error = EsqlTestUtils.unwrapIfWrappedInRemoteComputeException(error);
255246
EsqlTestUtils.assertEsqlFailure(error);
256247
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
257248
assertNotNull(unwrapped);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.compute.operator.Driver;
2121
import org.elasticsearch.compute.operator.DriverProfile;
2222
import org.elasticsearch.compute.operator.DriverTaskRunner;
23+
import org.elasticsearch.compute.operator.FailureCollector;
2324
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2425
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
2526
import org.elasticsearch.core.Releasable;
@@ -39,6 +40,7 @@
3940
import org.elasticsearch.tasks.TaskManager;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.elasticsearch.transport.RemoteClusterAware;
43+
import org.elasticsearch.transport.TransportException;
4244
import org.elasticsearch.transport.TransportRequest;
4345
import org.elasticsearch.transport.TransportService;
4446
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
@@ -339,8 +341,24 @@ public void execute(
339341
cluster,
340342
cancelQueryOnFailure,
341343
execInfo,
342-
computeListener.acquireCompute()
343-
.delegateResponse((l, ex) -> l.onFailure(new RemoteComputeException(cluster.clusterAlias(), ex)))
344+
computeListener.acquireCompute().delegateResponse((l, ex) -> {
345+
/*
346+
* At various points, when collecting failures before sending a response, we manually check
347+
* if an ex is a transport error and if it is, we unwrap it. Because we're wrapping an ex
348+
* in RemoteComputeException, the checks fail and unwrapping does not happen. We offload
349+
* the unwrapping to here.
350+
*
351+
* Note: The other error we explicitly check for is TaskCancelledException which is never
352+
* wrapped.
353+
*/
354+
if (ex instanceof TransportException te) {
355+
l.onFailure(
356+
new RemoteComputeException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te))
357+
);
358+
} else {
359+
l.onFailure(new RemoteComputeException(cluster.clusterAlias(), ex));
360+
}
361+
})
344362
);
345363
}
346364
}

0 commit comments

Comments
 (0)