Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b2fc21d
Wrap remote errors with cluster name to provide more context
pawankartik-elastic Feb 21, 2025
a049cc9
Update docs/changelog/123156.yaml
pawankartik-elastic Mar 3, 2025
e32956a
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 3, 2025
cd3fefa
Add test and handle missed subset of scenarios where an error can be
pawankartik-elastic Mar 7, 2025
6daa9e1
Remove unused import
pawankartik-elastic Mar 7, 2025
bc166cc
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 7, 2025
db90b71
Introduce `RemoteComputeException` that forwards the cause's status code
pawankartik-elastic Mar 12, 2025
21cfe8a
Adjust test to match the new exception type
pawankartik-elastic Mar 12, 2025
a7bf704
Fix license header
pawankartik-elastic Mar 12, 2025
e1e9592
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 12, 2025
8e9976d
Adjust test to match the new exception type
pawankartik-elastic Mar 12, 2025
6dd8421
Rename test
pawankartik-elastic Mar 12, 2025
8f2a39d
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 12, 2025
57a133a
Reword sentence as per review suggestion
pawankartik-elastic Mar 17, 2025
eb27643
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 17, 2025
ea67731
Register exception as serializable
pawankartik-elastic Mar 17, 2025
a155983
Import sort
pawankartik-elastic Mar 17, 2025
24b103d
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 18, 2025
cb130f5
Fix tests
pawankartik-elastic Mar 19, 2025
ca40d5c
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 19, 2025
fafa4ae
Fix license header
pawankartik-elastic Mar 20, 2025
7dd834e
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 20, 2025
a5d14a4
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 24, 2025
99f9b45
Relax assertions for the wrapping
pawankartik-elastic Mar 24, 2025
2ac89c7
Remove redundant getCause()
pawankartik-elastic Mar 24, 2025
32e26f6
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 26, 2025
764933f
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 31, 2025
8ef049f
Ensure transport errors are unwrapped before sending response to the
pawankartik-elastic Mar 31, 2025
04149a4
Address review comment: rename exception
pawankartik-elastic Apr 1, 2025
4e0baa3
[CI] Auto commit changes from spotless
Apr 1, 2025
8a5d6a0
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Apr 1, 2025
90b314a
Rename leftover remnants
pawankartik-elastic Apr 1, 2025
0996521
Address review suggestions
pawankartik-elastic Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123156.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123156
summary: Wrap remote errors with cluster name to provide more context
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -1981,7 +1982,8 @@ private enum ElasticsearchExceptionHandle {
IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new,
183,
TransportVersions.V_8_16_0
);
),
REMOTE_EXCEPTION(RemoteException.class, RemoteException::new, 184, TransportVersions.REMOTE_EXCEPTION);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Objects;

/**
* Represents an error that occurred on a remote node.
* It allows capturing some context such as the cluster alias that encountered the error.
*/
public class RemoteException extends ElasticsearchException {

/**
* @param clusterAlias Name of the cluster.
* @param cause Error that was encountered.
*/
public RemoteException(String clusterAlias, Throwable cause) {
super("Remote [" + clusterAlias + "] encountered an error", cause);
Objects.requireNonNull(cause);
}

public RemoteException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
// This is similar to what we do in SearchPhaseExecutionException.
return ExceptionsHelper.status(getCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.client.internal.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
Expand Down Expand Up @@ -840,6 +841,7 @@ public void testIds() {
ids.put(181, ResourceAlreadyUploadedException.class);
ids.put(182, IngestPipelineException.class);
ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class);
ids.put(184, RemoteException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -858,4 +859,18 @@ public static <T> T singleValue(Collection<T> collection) {
assertThat(collection, hasSize(1));
return collection.iterator().next();
}

/**
* Errors from remotes are wrapped in RemoteException while the ones from the local cluster
* aren't. This utility method is useful for unwrapping in such cases.
* @param e Exception to unwrap.
* @return Cause of RemoteException, else the error itself.
*/
public static Exception unwrapIfWrappedInRemoteException(Exception e) {
if (e instanceof RemoteException rce) {
return (Exception) rce.getCause();
} else {
return e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -163,6 +164,7 @@ public void testCancel() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();
}
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
assertThat(error.getMessage(), containsString("proxy timeout"));
}

Expand Down Expand Up @@ -284,6 +286,7 @@ public void testCancelSkipUnavailable() throws Exception {
}

Exception error = expectThrows(Exception.class, requestFuture::actionGet);
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
assertThat(error, instanceOf(TaskCancelledException.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -813,10 +814,13 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException {
Map<String, Object> testClusterInfo = setupFailClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remote1Index = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
assertThat(e.getMessage(), containsString("Accessing failing field"));

Exception error = expectThrows(Exception.class, () -> runQuery(q, false));
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);

assertThat(error, instanceOf(IllegalStateException.class));
assertThat(error.getMessage(), containsString("Accessing failing field"));
}

private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -104,7 +105,10 @@ public void testPartialResults() throws Exception {
request.includeCCSMetadata(randomBoolean());
{
request.allowPartialResults(false);
IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close());
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);

assertThat(error, instanceOf(IllegalStateException.class));
assertThat(error.getMessage(), containsString("Accessing failing field"));
}
request.allowPartialResults(true);
Expand Down Expand Up @@ -190,6 +194,7 @@ public void sendResponse(Exception exception) {
{
request.allowPartialResults(false);
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
assertNotNull(unwrapped);
assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage()));
Expand Down Expand Up @@ -236,7 +241,8 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
request.includeCCSMetadata(randomBoolean());
{
request.allowPartialResults(false);
var error = expectThrows(Exception.class, () -> runQuery(request).close());
Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
EsqlTestUtils.assertEsqlFailure(error);
var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
assertNotNull(unwrapped);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import static org.hamcrest.Matchers.is;

public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase {

public void testThatRemoteErrorsAreWrapped() throws Exception {
setupClusters(2);
setSkipUnavailable(REMOTE_CLUSTER_1, false);
setSkipUnavailable(REMOTE_CLUSTER_2, false);

/*
* Let's say something went wrong with the Exchange and its specifics when talking to a remote.
* And let's pretend only cluster-a is affected.
*/
for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
(requestHandler, transportRequest, transportChannel, transportTask) -> {
throw new IllegalArgumentException("some error to wreck havoc");
}
);
}

RemoteException wrappedError = expectThrows(
RemoteException.class,
() -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false)
);
assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.RunOnce;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand All @@ -38,6 +40,7 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
Expand Down Expand Up @@ -338,7 +341,22 @@ public void execute(
cluster,
cancelQueryOnFailure,
execInfo,
computeListener.acquireCompute()
computeListener.acquireCompute().delegateResponse((l, ex) -> {
/*
* At various points, when collecting failures before sending a response, we manually check
* if an ex is a transport error and if it is, we unwrap it. Because we're wrapping an ex
* in RemoteException, the checks fail and unwrapping does not happen. We offload the
* unwrapping to here.
*
* Note: The other error we explicitly check for is TaskCancelledException which is never
* wrapped.
*/
if (ex instanceof TransportException te) {
l.onFailure(new RemoteException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te)));
} else {
l.onFailure(new RemoteException(cluster.clusterAlias(), ex));
}
})
);
}
}
Expand Down
Loading