Skip to content

Commit 91775be

Browse files
ES|QL: Wrap remote errors with cluster name to provide more context (elastic#123156) (elastic#126165)
Wrap remote errors with cluster name to provide more context Previously, if a remote encountered an error, user would see a top-level error that would provide no context about which remote ran into the error. Now, such errors are wrapped in a separate remote exception whose error message clearly specifies the name of the remote cluster and the error that occurred is the cause of this remote exception. (cherry picked from commit e4fb22c) # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
1 parent 5510b7f commit 91775be

File tree

11 files changed

+150
-7
lines changed

11 files changed

+150
-7
lines changed

docs/changelog/123156.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123156
2+
summary: Wrap remote errors with cluster name to provide more context
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.lucene.store.LockObtainFailedException;
1717
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
1818
import org.elasticsearch.action.support.replication.ReplicationOperation;
19+
import org.elasticsearch.cluster.RemoteException;
1920
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2021
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
2122
import org.elasticsearch.common.io.stream.StreamInput;
@@ -1936,7 +1937,8 @@ private enum ElasticsearchExceptionHandle {
19361937
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new,
19371938
183,
19381939
TransportVersions.V_8_16_0
1939-
);
1940+
),
1941+
REMOTE_EXCEPTION(RemoteException.class, RemoteException::new, 184, TransportVersions.REMOTE_EXCEPTION_8_19);
19401942

19411943
final Class<? extends ElasticsearchException> exceptionClass;
19421944
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ static TransportVersion def(int id) {
200200
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA_8_19 = def(8_841_0_13);
201201
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE_8_19 = def(8_841_0_14);
202202
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED_8_19 = def(8_841_0_15);
203+
public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16);
203204

204205
/*
205206
* STOP! READ THIS FIRST! No, really,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster;
11+
12+
import org.elasticsearch.ElasticsearchException;
13+
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.rest.RestStatus;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* Represents an error that occurred on a remote node.
22+
* It allows capturing some context such as the cluster alias that encountered the error.
23+
*/
24+
public class RemoteException extends ElasticsearchException {
25+
26+
/**
27+
* @param clusterAlias Name of the cluster.
28+
* @param cause Error that was encountered.
29+
*/
30+
public RemoteException(String clusterAlias, Throwable cause) {
31+
super("Remote [" + clusterAlias + "] encountered an error", cause);
32+
Objects.requireNonNull(cause);
33+
}
34+
35+
public RemoteException(StreamInput in) throws IOException {
36+
super(in);
37+
}
38+
39+
@Override
40+
public RestStatus status() {
41+
// This is similar to what we do in SearchPhaseExecutionException.
42+
return ExceptionsHelper.status(getCause());
43+
}
44+
}

server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.action.search.VersionMismatchException;
2323
import org.elasticsearch.action.support.replication.ReplicationOperation;
2424
import org.elasticsearch.client.internal.AbstractClientHeadersTestCase;
25+
import org.elasticsearch.cluster.RemoteException;
2526
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2627
import org.elasticsearch.cluster.block.ClusterBlockException;
2728
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
@@ -840,6 +841,7 @@ public void testIds() {
840841
ids.put(181, ResourceAlreadyUploadedException.class);
841842
ids.put(182, IngestPipelineException.class);
842843
ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class);
844+
ids.put(184, RemoteException.class);
843845

844846
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
845847
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.lucene.sandbox.document.HalfFloatPoint;
1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.cluster.RemoteException;
1415
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.Strings;
@@ -842,4 +843,18 @@ public static void assertEsqlFailure(Exception e) {
842843
ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException)
843844
.ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
844845
}
846+
847+
/**
848+
* Errors from remotes are wrapped in RemoteException while the ones from the local cluster
849+
* aren't. This utility method is useful for unwrapping in such cases.
850+
* @param e Exception to unwrap.
851+
* @return Cause of RemoteException, else the error itself.
852+
*/
853+
public static Exception unwrapIfWrappedInRemoteException(Exception e) {
854+
if (e instanceof RemoteException rce) {
855+
return (Exception) rce.getCause();
856+
} else {
857+
return e;
858+
}
859+
}
845860
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.transport.TransportService;
2727
import org.elasticsearch.xcontent.XContentBuilder;
2828
import org.elasticsearch.xcontent.json.JsonXContent;
29+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2930
import org.elasticsearch.xpack.esql.plugin.ComputeService;
3031
import org.junit.After;
3132
import org.junit.Before;
@@ -163,6 +164,7 @@ public void testCancel() throws Exception {
163164
SimplePauseFieldPlugin.allowEmitting.countDown();
164165
}
165166
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
167+
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
166168
assertThat(error.getMessage(), containsString("proxy timeout"));
167169
}
168170

@@ -284,6 +286,7 @@ public void testCancelSkipUnavailable() throws Exception {
284286
}
285287

286288
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
289+
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
287290
assertThat(error, instanceOf(TaskCancelledException.class));
288291
}
289292
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.transport.TransportService;
3030
import org.elasticsearch.xcontent.XContentBuilder;
3131
import org.elasticsearch.xcontent.json.JsonXContent;
32+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
3233
import org.elasticsearch.xpack.esql.VerificationException;
3334
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
3435

@@ -813,10 +814,13 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException {
813814
Map<String, Object> testClusterInfo = setupFailClusters();
814815
String localIndex = (String) testClusterInfo.get("local.index");
815816
String remote1Index = (String) testClusterInfo.get("remote.index");
816-
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
817817
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
818-
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
819-
assertThat(e.getMessage(), containsString("Accessing failing field"));
818+
819+
Exception error = expectThrows(Exception.class, () -> runQuery(q, false));
820+
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
821+
822+
assertThat(error, instanceOf(IllegalStateException.class));
823+
assertThat(error.getMessage(), containsString("Accessing failing field"));
820824
}
821825

822826
private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.hamcrest.Matchers.equalTo;
4343
import static org.hamcrest.Matchers.greaterThan;
4444
import static org.hamcrest.Matchers.in;
45+
import static org.hamcrest.Matchers.instanceOf;
4546
import static org.hamcrest.Matchers.is;
4647
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4748
import static org.hamcrest.Matchers.not;
@@ -104,7 +105,10 @@ public void testPartialResults() throws Exception {
104105
request.includeCCSMetadata(randomBoolean());
105106
{
106107
request.allowPartialResults(false);
107-
IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close());
108+
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
109+
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
110+
111+
assertThat(error, instanceOf(IllegalStateException.class));
108112
assertThat(error.getMessage(), containsString("Accessing failing field"));
109113
}
110114
request.allowPartialResults(true);
@@ -190,6 +194,7 @@ public void sendResponse(Exception exception) {
190194
{
191195
request.allowPartialResults(false);
192196
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
197+
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
193198
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
194199
assertNotNull(unwrapped);
195200
assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage()));
@@ -236,7 +241,8 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
236241
request.includeCCSMetadata(randomBoolean());
237242
{
238243
request.allowPartialResults(false);
239-
var error = expectThrows(Exception.class, () -> runQuery(request).close());
244+
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
245+
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
240246
EsqlTestUtils.assertEsqlFailure(error);
241247
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
242248
assertNotNull(unwrapped);
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.cluster.RemoteException;
11+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
12+
import org.elasticsearch.test.transport.MockTransportService;
13+
import org.elasticsearch.transport.TransportService;
14+
15+
import static org.hamcrest.Matchers.is;
16+
17+
public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase {
18+
19+
public void testThatRemoteErrorsAreWrapped() throws Exception {
20+
setupClusters(2);
21+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
22+
setSkipUnavailable(REMOTE_CLUSTER_2, false);
23+
24+
/*
25+
* Let's say something went wrong with the Exchange and its specifics when talking to a remote.
26+
* And let's pretend only cluster-a is affected.
27+
*/
28+
for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
29+
((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
30+
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
31+
(requestHandler, transportRequest, transportChannel, transportTask) -> {
32+
throw new IllegalArgumentException("some error to wreck havoc");
33+
}
34+
);
35+
}
36+
37+
RemoteException wrappedError = expectThrows(
38+
RemoteException.class,
39+
() -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false)
40+
);
41+
assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error"));
42+
}
43+
}

0 commit comments

Comments
 (0)