From f3ba0185eb29777d6e892265b2886180298fd048 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Wed, 2 Apr 2025 18:08:20 +0100 Subject: [PATCH] ES|QL: Wrap remote errors with cluster name to provide more context (#123156) Wrap remote errors with cluster name to provide more context Previously, if a remote encountered an error, user would see a top-level error that would provide no context about which remote ran into the error. Now, such errors are wrapped in a separate remote exception whose error message clearly specifies the name of the remote cluster and the error that occurred is the cause of this remote exception. (cherry picked from commit e4fb22c4f3febd7c303427cf8f8e44b37ede016a) # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java --- docs/changelog/123156.yaml | 5 +++ .../elasticsearch/ElasticsearchException.java | 4 +- .../org/elasticsearch/TransportVersions.java | 1 + .../cluster/RemoteException.java | 44 +++++++++++++++++++ .../ExceptionSerializationTests.java | 2 + .../xpack/esql/EsqlTestUtils.java | 15 +++++++ .../action/CrossClusterCancellationIT.java | 3 ++ .../esql/action/CrossClusterQueryIT.java | 10 +++-- ...CrossClusterQueryWithPartialResultsIT.java | 10 ++++- .../esql/action/EsqlRemoteErrorWrapIT.java | 43 ++++++++++++++++++ .../xpack/esql/plugin/ComputeService.java | 20 ++++++++- 11 files changed, 150 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/123156.yaml create mode 100644 server/src/main/java/org/elasticsearch/cluster/RemoteException.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java diff --git a/docs/changelog/123156.yaml b/docs/changelog/123156.yaml new file mode 100644 index 0000000000000..6d220c8c585df --- /dev/null +++ b/docs/changelog/123156.yaml @@ -0,0 +1,5 @@ +pr: 123156 +summary: Wrap remote errors with cluster name to provide more context +area: Search +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 981aafa5424f0..b7980cf88fc6d 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -16,6 +16,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.io.stream.StreamInput; @@ -1936,7 +1937,8 @@ private enum ElasticsearchExceptionHandle { IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new, 183, TransportVersions.V_8_16_0 - ); + ), + REMOTE_EXCEPTION(RemoteException.class, RemoteException::new, 184, TransportVersions.REMOTE_EXCEPTION_8_19); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 305a94bb1c2a4..71afb52902f35 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -200,6 +200,7 @@ static TransportVersion def(int id) { public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA_8_19 = def(8_841_0_13); public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE_8_19 = def(8_841_0_14); public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED_8_19 = def(8_841_0_15); + public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/RemoteException.java b/server/src/main/java/org/elasticsearch/cluster/RemoteException.java new file mode 100644 index 0000000000000..7442b05430b11 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/RemoteException.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents an error that occurred on a remote node. + * It allows capturing some context such as the cluster alias that encountered the error. + */ +public class RemoteException extends ElasticsearchException { + + /** + * @param clusterAlias Name of the cluster. + * @param cause Error that was encountered. + */ + public RemoteException(String clusterAlias, Throwable cause) { + super("Remote [" + clusterAlias + "] encountered an error", cause); + Objects.requireNonNull(cause); + } + + public RemoteException(StreamInput in) throws IOException { + super(in); + } + + @Override + public RestStatus status() { + // This is similar to what we do in SearchPhaseExecutionException. + return ExceptionsHelper.status(getCause()); + } +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 412513051b34c..2ea80c0f0e152 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.search.VersionMismatchException; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.internal.AbstractClientHeadersTestCase; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; @@ -840,6 +841,7 @@ public void testIds() { ids.put(181, ResourceAlreadyUploadedException.class); ids.put(182, IngestPipelineException.class); ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class); + ids.put(184, RemoteException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 9ce2685469e1a..6d1661a9ed5d8 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -11,6 +11,7 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -842,4 +843,18 @@ public static void assertEsqlFailure(Exception e) { ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException) .ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause())); } + + /** + * Errors from remotes are wrapped in RemoteException while the ones from the local cluster + * aren't. This utility method is useful for unwrapping in such cases. + * @param e Exception to unwrap. + * @return Cause of RemoteException, else the error itself. + */ + public static Exception unwrapIfWrappedInRemoteException(Exception e) { + if (e instanceof RemoteException rce) { + return (Exception) rce.getCause(); + } else { + return e; + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java index 1eacda540e65d..4a5de9dd7d632 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.junit.After; import org.junit.Before; @@ -163,6 +164,7 @@ public void testCancel() throws Exception { SimplePauseFieldPlugin.allowEmitting.countDown(); } Exception error = expectThrows(Exception.class, requestFuture::actionGet); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); assertThat(error.getMessage(), containsString("proxy timeout")); } @@ -284,6 +286,7 @@ public void testCancelSkipUnavailable() throws Exception { } Exception error = expectThrows(Exception.class, requestFuture::actionGet); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); assertThat(error, instanceOf(TaskCancelledException.class)); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index fb0730ae69a2f..5edf4dffdda1e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -813,10 +814,13 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException { Map testClusterInfo = setupFailClusters(); String localIndex = (String) testClusterInfo.get("local.index"); String remote1Index = (String) testClusterInfo.get("remote.index"); - int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index); - IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false)); - assertThat(e.getMessage(), containsString("Accessing failing field")); + + Exception error = expectThrows(Exception.class, () -> runQuery(q, false)); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); + + assertThat(error, instanceOf(IllegalStateException.class)); + assertThat(error.getMessage(), containsString("Accessing failing field")); } private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 0ed5f828bebf9..463dbb81304a1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -104,7 +105,10 @@ public void testPartialResults() throws Exception { request.includeCCSMetadata(randomBoolean()); { request.allowPartialResults(false); - IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); + + assertThat(error, instanceOf(IllegalStateException.class)); assertThat(error.getMessage(), containsString("Accessing failing field")); } request.allowPartialResults(true); @@ -190,6 +194,7 @@ public void sendResponse(Exception exception) { { request.allowPartialResults(false); Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); assertNotNull(unwrapped); assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); @@ -236,7 +241,8 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception { request.includeCCSMetadata(randomBoolean()); { request.allowPartialResults(false); - var error = expectThrows(Exception.class, () -> runQuery(request).close()); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); EsqlTestUtils.assertEsqlFailure(error); var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); assertNotNull(unwrapped); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java new file mode 100644 index 0000000000000..bc4d5d35ea71c --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.cluster.RemoteException; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import static org.hamcrest.Matchers.is; + +public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase { + + public void testThatRemoteErrorsAreWrapped() throws Exception { + setupClusters(2); + setSkipUnavailable(REMOTE_CLUSTER_1, false); + setSkipUnavailable(REMOTE_CLUSTER_2, false); + + /* + * Let's say something went wrong with the Exchange and its specifics when talking to a remote. + * And let's pretend only cluster-a is affected. + */ + for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) { + ((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior( + ExchangeService.OPEN_EXCHANGE_ACTION_NAME, + (requestHandler, transportRequest, transportChannel, transportTask) -> { + throw new IllegalArgumentException("some error to wreck havoc"); + } + ); + } + + RemoteException wrappedError = expectThrows( + RemoteException.class, + () -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false) + ); + assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error")); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index cf077002eec13..80b5f6c460ca5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -19,6 +20,7 @@ import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -38,6 +40,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -298,7 +301,22 @@ public void execute( cluster, cancelQueryOnFailure, execInfo, - computeListener.acquireCompute() + computeListener.acquireCompute().delegateResponse((l, ex) -> { + /* + * At various points, when collecting failures before sending a response, we manually check + * if an ex is a transport error and if it is, we unwrap it. Because we're wrapping an ex + * in RemoteException, the checks fail and unwrapping does not happen. We offload the + * unwrapping to here. + * + * Note: The other error we explicitly check for is TaskCancelledException which is never + * wrapped. + */ + if (ex instanceof TransportException te) { + l.onFailure(new RemoteException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te))); + } else { + l.onFailure(new RemoteException(cluster.clusterAlias(), ex)); + } + }) ); } }