Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123156.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123156
summary: Wrap remote errors with cluster name to provide more context
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -1936,7 +1937,8 @@ private enum ElasticsearchExceptionHandle {
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new,
183,
TransportVersions.V_8_16_0
);
),
REMOTE_EXCEPTION(RemoteException.class, RemoteException::new, 184, TransportVersions.REMOTE_EXCEPTION_8_19);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA_8_19 = def(8_841_0_13);
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE_8_19 = def(8_841_0_14);
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED_8_19 = def(8_841_0_15);
public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Objects;

/**
* Represents an error that occurred on a remote node.
* It allows capturing some context such as the cluster alias that encountered the error.
*/
public class RemoteException extends ElasticsearchException {

/**
* @param clusterAlias Name of the cluster.
* @param cause Error that was encountered.
*/
public RemoteException(String clusterAlias, Throwable cause) {
super("Remote [" + clusterAlias + "] encountered an error", cause);
Objects.requireNonNull(cause);
}

public RemoteException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
// This is similar to what we do in SearchPhaseExecutionException.
return ExceptionsHelper.status(getCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.search.VersionMismatchException;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.client.internal.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
Expand Down Expand Up @@ -840,6 +841,7 @@ public void testIds() {
ids.put(181, ResourceAlreadyUploadedException.class);
ids.put(182, IngestPipelineException.class);
ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class);
ids.put(184, RemoteException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -842,4 +843,18 @@ public static void assertEsqlFailure(Exception e) {
ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException)
.ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
}

/**
* Errors from remotes are wrapped in RemoteException while the ones from the local cluster
* aren't. This utility method is useful for unwrapping in such cases.
* @param e Exception to unwrap.
* @return Cause of RemoteException, else the error itself.
*/
public static Exception unwrapIfWrappedInRemoteException(Exception e) {
if (e instanceof RemoteException rce) {
return (Exception) rce.getCause();
} else {
return e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -163,6 +164,7 @@ public void testCancel() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();
}
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
assertThat(error.getMessage(), containsString("proxy timeout"));
}

Expand Down Expand Up @@ -284,6 +286,7 @@ public void testCancelSkipUnavailable() throws Exception {
}

Exception error = expectThrows(Exception.class, requestFuture::actionGet);
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
assertThat(error, instanceOf(TaskCancelledException.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -813,10 +814,13 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException {
Map<String, Object> testClusterInfo = setupFailClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remote1Index = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
assertThat(e.getMessage(), containsString("Accessing failing field"));

Exception error = expectThrows(Exception.class, () -> runQuery(q, false));
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);

assertThat(error, instanceOf(IllegalStateException.class));
assertThat(error.getMessage(), containsString("Accessing failing field"));
}

private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -104,7 +105,10 @@ public void testPartialResults() throws Exception {
request.includeCCSMetadata(randomBoolean());
{
request.allowPartialResults(false);
IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close());
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);

assertThat(error, instanceOf(IllegalStateException.class));
assertThat(error.getMessage(), containsString("Accessing failing field"));
}
request.allowPartialResults(true);
Expand Down Expand Up @@ -190,6 +194,7 @@ public void sendResponse(Exception exception) {
{
request.allowPartialResults(false);
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
assertNotNull(unwrapped);
assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage()));
Expand Down Expand Up @@ -236,7 +241,8 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
request.includeCCSMetadata(randomBoolean());
{
request.allowPartialResults(false);
var error = expectThrows(Exception.class, () -> runQuery(request).close());
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
EsqlTestUtils.assertEsqlFailure(error);
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
assertNotNull(unwrapped);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import static org.hamcrest.Matchers.is;

public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase {

public void testThatRemoteErrorsAreWrapped() throws Exception {
setupClusters(2);
setSkipUnavailable(REMOTE_CLUSTER_1, false);
setSkipUnavailable(REMOTE_CLUSTER_2, false);

/*
* Let's say something went wrong with the Exchange and its specifics when talking to a remote.
* And let's pretend only cluster-a is affected.
*/
for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
(requestHandler, transportRequest, transportChannel, transportTask) -> {
throw new IllegalArgumentException("some error to wreck havoc");
}
);
}

RemoteException wrappedError = expectThrows(
RemoteException.class,
() -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false)
);
assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.RunOnce;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand All @@ -38,6 +40,7 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand Down Expand Up @@ -298,7 +301,22 @@ public void execute(
cluster,
cancelQueryOnFailure,
execInfo,
computeListener.acquireCompute()
computeListener.acquireCompute().delegateResponse((l, ex) -> {
/*
* At various points, when collecting failures before sending a response, we manually check
* if an ex is a transport error and if it is, we unwrap it. Because we're wrapping an ex
* in RemoteException, the checks fail and unwrapping does not happen. We offload the
* unwrapping to here.
*
* Note: The other error we explicitly check for is TaskCancelledException which is never
* wrapped.
*/
if (ex instanceof TransportException te) {
l.onFailure(new RemoteException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te)));
} else {
l.onFailure(new RemoteException(cluster.clusterAlias(), ex));
}
})
);
}
}
Expand Down