diff --git a/docs/changelog/123156.yaml b/docs/changelog/123156.yaml new file mode 100644 index 0000000000000..6d220c8c585df --- /dev/null +++ b/docs/changelog/123156.yaml @@ -0,0 +1,5 @@ +pr: 123156 +summary: Wrap remote errors with cluster name to provide more context +area: Search +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index ab2f7db484c2f..1b3e0c60ca241 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -16,6 +16,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.io.stream.StreamInput; @@ -1981,7 +1982,8 @@ private enum ElasticsearchExceptionHandle { IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new, 183, TransportVersions.V_8_16_0 - ); + ), + REMOTE_EXCEPTION(RemoteException.class, RemoteException::new, 184, TransportVersions.REMOTE_EXCEPTION); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index eececd187f11e..7071b099749d3 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -209,6 +209,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00); public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00); public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00); + public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/RemoteException.java b/server/src/main/java/org/elasticsearch/cluster/RemoteException.java new file mode 100644 index 0000000000000..7442b05430b11 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/RemoteException.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents an error that occurred on a remote node. + * It allows capturing some context such as the cluster alias that encountered the error. + */ +public class RemoteException extends ElasticsearchException { + + /** + * @param clusterAlias Name of the cluster. + * @param cause Error that was encountered. + */ + public RemoteException(String clusterAlias, Throwable cause) { + super("Remote [" + clusterAlias + "] encountered an error", cause); + Objects.requireNonNull(cause); + } + + public RemoteException(StreamInput in) throws IOException { + super(in); + } + + @Override + public RestStatus status() { + // This is similar to what we do in SearchPhaseExecutionException. + return ExceptionsHelper.status(getCause()); + } +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 35fce1b20241b..bfa897173a368 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.internal.AbstractClientHeadersTestCase; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; @@ -840,6 +841,7 @@ public void testIds() { ids.put(181, ResourceAlreadyUploadedException.class); ids.put(182, IngestPipelineException.class); ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class); + ids.put(184, RemoteException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 19e8f0ef36278..611acb164fd26 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -11,6 +11,7 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -858,4 +859,18 @@ public static T singleValue(Collection collection) { assertThat(collection, hasSize(1)); return collection.iterator().next(); } + + /** + * Errors from remotes are wrapped in RemoteException while the ones from the local cluster + * aren't. This utility method is useful for unwrapping in such cases. + * @param e Exception to unwrap. + * @return Cause of RemoteException, else the error itself. + */ + public static Exception unwrapIfWrappedInRemoteException(Exception e) { + if (e instanceof RemoteException rce) { + return (Exception) rce.getCause(); + } else { + return e; + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java index df27145d488a0..8131af2da408d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.junit.After; import org.junit.Before; @@ -163,6 +164,7 @@ public void testCancel() throws Exception { SimplePauseFieldPlugin.allowEmitting.countDown(); } Exception error = expectThrows(Exception.class, requestFuture::actionGet); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); assertThat(error.getMessage(), containsString("proxy timeout")); } @@ -284,6 +286,7 @@ public void testCancelSkipUnavailable() throws Exception { } Exception error = expectThrows(Exception.class, requestFuture::actionGet); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); assertThat(error, instanceOf(TaskCancelledException.class)); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index 715289ccefbdd..fe5c36aff07e7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -813,10 +814,13 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException { Map testClusterInfo = setupFailClusters(); String localIndex = (String) testClusterInfo.get("local.index"); String remote1Index = (String) testClusterInfo.get("remote.index"); - int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index); - IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false)); - assertThat(e.getMessage(), containsString("Accessing failing field")); + + Exception error = expectThrows(Exception.class, () -> runQuery(q, false)); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); + + assertThat(error, instanceOf(IllegalStateException.class)); + assertThat(error.getMessage(), containsString("Accessing failing field")); } private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 0ed5f828bebf9..463dbb81304a1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -104,7 +105,10 @@ public void testPartialResults() throws Exception { request.includeCCSMetadata(randomBoolean()); { request.allowPartialResults(false); - IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); + + assertThat(error, instanceOf(IllegalStateException.class)); assertThat(error.getMessage(), containsString("Accessing failing field")); } request.allowPartialResults(true); @@ -190,6 +194,7 @@ public void sendResponse(Exception exception) { { request.allowPartialResults(false); Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); assertNotNull(unwrapped); assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); @@ -236,7 +241,8 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception { request.includeCCSMetadata(randomBoolean()); { request.allowPartialResults(false); - var error = expectThrows(Exception.class, () -> runQuery(request).close()); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); EsqlTestUtils.assertEsqlFailure(error); var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); assertNotNull(unwrapped); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java new file mode 100644 index 0000000000000..bc4d5d35ea71c --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.cluster.RemoteException; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import static org.hamcrest.Matchers.is; + +public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase { + + public void testThatRemoteErrorsAreWrapped() throws Exception { + setupClusters(2); + setSkipUnavailable(REMOTE_CLUSTER_1, false); + setSkipUnavailable(REMOTE_CLUSTER_2, false); + + /* + * Let's say something went wrong with the Exchange and its specifics when talking to a remote. + * And let's pretend only cluster-a is affected. + */ + for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) { + ((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior( + ExchangeService.OPEN_EXCHANGE_ACTION_NAME, + (requestHandler, transportRequest, transportChannel, transportTask) -> { + throw new IllegalArgumentException("some error to wreck havoc"); + } + ); + } + + RemoteException wrappedError = expectThrows( + RemoteException.class, + () -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false) + ); + assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error")); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 293be0eb3c2b0..a9f00f3635703 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -19,6 +20,7 @@ import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -38,6 +40,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -338,7 +341,22 @@ public void execute( cluster, cancelQueryOnFailure, execInfo, - computeListener.acquireCompute() + computeListener.acquireCompute().delegateResponse((l, ex) -> { + /* + * At various points, when collecting failures before sending a response, we manually check + * if an ex is a transport error and if it is, we unwrap it. Because we're wrapping an ex + * in RemoteException, the checks fail and unwrapping does not happen. We offload the + * unwrapping to here. + * + * Note: The other error we explicitly check for is TaskCancelledException which is never + * wrapped. + */ + if (ex instanceof TransportException te) { + l.onFailure(new RemoteException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te))); + } else { + l.onFailure(new RemoteException(cluster.clusterAlias(), ex)); + } + }) ); } }