Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b2fc21d
Wrap remote errors with cluster name to provide more context
pawankartik-elastic Feb 21, 2025
a049cc9
Update docs/changelog/123156.yaml
pawankartik-elastic Mar 3, 2025
e32956a
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 3, 2025
cd3fefa
Add test and handle missed subset of scenarios where an error can be
pawankartik-elastic Mar 7, 2025
6daa9e1
Remove unused import
pawankartik-elastic Mar 7, 2025
bc166cc
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 7, 2025
db90b71
Introduce `RemoteComputeException` that forwards the cause's status code
pawankartik-elastic Mar 12, 2025
21cfe8a
Adjust test to match the new exception type
pawankartik-elastic Mar 12, 2025
a7bf704
Fix license header
pawankartik-elastic Mar 12, 2025
e1e9592
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 12, 2025
8e9976d
Adjust test to match the new exception type
pawankartik-elastic Mar 12, 2025
6dd8421
Rename test
pawankartik-elastic Mar 12, 2025
8f2a39d
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 12, 2025
57a133a
Reword sentence as per review suggestion
pawankartik-elastic Mar 17, 2025
eb27643
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 17, 2025
ea67731
Register exception as serializable
pawankartik-elastic Mar 17, 2025
a155983
Import sort
pawankartik-elastic Mar 17, 2025
24b103d
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 18, 2025
cb130f5
Fix tests
pawankartik-elastic Mar 19, 2025
ca40d5c
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 19, 2025
fafa4ae
Fix license header
pawankartik-elastic Mar 20, 2025
7dd834e
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 20, 2025
a5d14a4
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 24, 2025
99f9b45
Relax assertions for the wrapping
pawankartik-elastic Mar 24, 2025
2ac89c7
Remove redundant getCause()
pawankartik-elastic Mar 24, 2025
32e26f6
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 26, 2025
764933f
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Mar 31, 2025
8ef049f
Ensure transport errors are unwrapped before sending response to the
pawankartik-elastic Mar 31, 2025
04149a4
Address review comment: rename exception
pawankartik-elastic Apr 1, 2025
4e0baa3
[CI] Auto commit changes from spotless
Apr 1, 2025
8a5d6a0
Merge branch 'main' into pkar/esql-wrap-remote-errors
pawankartik-elastic Apr 1, 2025
90b314a
Rename leftover remnants
pawankartik-elastic Apr 1, 2025
0996521
Address review suggestions
pawankartik-elastic Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123156.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123156
summary: Wrap remote errors with cluster name to provide more context
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.RemoteComputeException;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -283,7 +284,7 @@ public void testCancelSkipUnavailable() throws Exception {
SimplePauseFieldPlugin.allowEmitting.countDown();
}

Exception error = expectThrows(Exception.class, requestFuture::actionGet);
assertThat(error, instanceOf(TaskCancelledException.class));
RemoteComputeException error = expectThrows(RemoteComputeException.class, requestFuture::actionGet);
assertThat(error.getCause(), instanceOf(TaskCancelledException.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.RemoteComputeException;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -813,10 +814,13 @@ public void testRemoteFailureSkipUnavailableTrue() throws IOException {
Map<String, Object> testClusterInfo = setupFailClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remote1Index = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
assertThat(e.getMessage(), containsString("Accessing failing field"));

RemoteComputeException rce = expectThrows(RemoteComputeException.class, () -> runQuery(q, false));
Throwable t = rce.getCause();

assertThat(t, instanceOf(IllegalStateException.class));
assertThat(t.getMessage(), containsString("Accessing failing field"));
}

private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.RemoteComputeException;
import org.elasticsearch.xpack.esql.plugin.ComputeService;

import java.io.IOException;
Expand All @@ -40,6 +41,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -73,8 +75,11 @@ public void testPartialResults() throws Exception {
request.includeCCSMetadata(randomBoolean());
{
request.allowPartialResults(false);
IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close());
assertThat(error.getMessage(), containsString("Accessing failing field"));
RemoteComputeException rce = expectThrows(RemoteComputeException.class, () -> runQuery(request).close());
Throwable t = rce.getCause();

assertThat(t, instanceOf(IllegalStateException.class));
assertThat(t.getMessage(), containsString("Accessing failing field"));
}
request.allowPartialResults(true);
try (var resp = runQuery(request)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.RemoteComputeException;

import static org.hamcrest.Matchers.is;

public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase {

public void testThatRemoteErrorsAreWrapped() throws Exception {
setupClusters(2);
setSkipUnavailable(REMOTE_CLUSTER_1, false);
setSkipUnavailable(REMOTE_CLUSTER_2, false);

/*
* Let's say something went wrong with the Exchange and its specifics when talking to a remote.
* And let's pretend only cluster-a is affected.
*/
for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
(requestHandler, transportRequest, transportChannel, transportTask) -> {
throw new IllegalArgumentException("some error to wreck havoc");
}
);
}

RemoteComputeException wrappedError = expectThrows(
RemoteComputeException.class,
() -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false)
);
assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.rest.RestStatus;

import java.util.Objects;

/**
* Represents an error that occurred when starting compute on a remote node.
* It allows capturing some context such as the cluster alias that encountered the error.
*/
public class RemoteComputeException extends ElasticsearchException {

/**
* @param clusterAlias Name of the cluster.
* @param cause Error that was encountered.
*/
public RemoteComputeException(String clusterAlias, Throwable cause) {
super("Remote [" + clusterAlias + "] encountered an error", cause);
Objects.requireNonNull(cause);
}

@Override
public RestStatus status() {
// This is similar to what we do in SearchPhaseExecutionException.
return ExceptionsHelper.status(getCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.RemoteComputeException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
Expand Down Expand Up @@ -300,6 +301,7 @@ public void execute(
cancelQueryOnFailure,
execInfo,
computeListener.acquireCompute()
.delegateResponse((l, ex) -> l.onFailure(new RemoteComputeException(cluster.clusterAlias(), ex)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar enough with the ComputeService to determine whether this exception is a local only exception, that will never be serialized through the wire. Is that the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's on my mind right now and I hope to get it confirmed with Nhat later today. Sounds good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically startComputeOnRemoteCluster is always called in the coordinating node. I am not sure however whether it means "it will be never serialized", as there seem to be scenarios - e.g. with async response - where the end result is serialized, and in that case this exception might have to be serialized too, I am not sure.

Copy link
Member

@dnhatn dnhatn Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What Stas said is correct. This exception is local with a sync query but can be serialized with an async query:

.

Maybe add an async query with failures to verify this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, if it can be serialized then it needs to be registered as a serializable one, which makes me wonder if we can reuse an existing one instead to avoid that ceremony :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the catch! Yes, the exception is getting serialised for asynchronous queries and has to be handled accordingly.

wonder if we can reuse an existing one instead to avoid that ceremony

To re-use an existing exception, we primarily need to fulfil 2 requirements:

  1. It should propagate the status of the cause, and,
  2. It should not implement the ES wrapper interface to prevent unwrapping when the error is sent back to the user (which discards the context we've built up, i.e. the remote's name).

I don't see any exceptions that we can reuse.

);
}
}
Expand Down