Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121240.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121240
summary: Implement runtime skip_unavailable=true
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

public class FailingFieldPlugin extends Plugin implements ScriptPlugin {

public static final String FAILING_FIELD_LANG = "failing_field";

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "failing_field";
return FAILING_FIELD_LANG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() {
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
return localCluster(remoteCluster, true);
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
return ElasticsearchCluster.local()
.name(LOCAL_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
Expand All @@ -41,6 +45,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
.setting("node.roles", "[data,ingest,master,remote_cluster_client]")
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
.setting("cluster.remote.connections_per_cluster", "1")
.setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString())
.shared(true)
.setting("cluster.routing.rebalance.enable", "none")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,17 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertTrue(executionInfo.isCrossClusterSearch());

boolean hasPartials = false;
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis()));
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
|| cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
hasPartials = true;
}
}
assertThat(executionInfo.isPartial(), equalTo(hasPartials));
}

private void setSkipUnavailable(String clusterAlias, boolean skip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -151,6 +152,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -203,6 +205,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -275,6 +278,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));

Expand Down Expand Up @@ -317,6 +321,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.Build;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.TransportService;
Expand All @@ -39,6 +41,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER = "cluster-a";
Expand Down Expand Up @@ -75,6 +78,11 @@ public void resetPlugin() {
SimplePauseFieldPlugin.resetPlugin();
}

@Override
protected boolean reuseClusters() {
return false;
}

private void createRemoteIndex(int numDocs) throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
Expand All @@ -96,6 +104,26 @@ private void createRemoteIndex(int numDocs) throws Exception {
bulk.get();
}

private void createLocalIndex(int numDocs) throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject("const");
{
mapping.field("type", "long");
}
mapping.endObject();
}
mapping.endObject();
mapping.endObject();
client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get();
BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < numDocs; i++) {
bulk.add(new IndexRequest().source("const", i));
}
bulk.get();
}

public void testCancel() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
Expand Down Expand Up @@ -208,4 +236,95 @@ public void testTasks() throws Exception {
}
requestFuture.actionGet(30, TimeUnit.SECONDS).close();
}

// Check that cancelling remote task with skip_unavailable=true produces partial
public void testCancelSkipUnavailable() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
request.pragmas(randomPragmas());
request.includeCCSMetadata(true);
PlainActionFuture<EsqlQueryResponse> requestFuture = new PlainActionFuture<>();
client().execute(EsqlQueryAction.INSTANCE, request, requestFuture);
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
List<TaskInfo> rootTasks = new ArrayList<>();
assertBusy(() -> {
List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(ComputeService.CLUSTER_ACTION_NAME)
.get()
.getTasks();
assertThat(tasks, hasSize(1));
rootTasks.addAll(tasks);
});
var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed");
client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest);
try {
assertBusy(() -> {
List<TaskInfo> drivers = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(DriverTaskRunner.ACTION_NAME)
.get()
.getTasks();
assertThat(drivers.size(), greaterThanOrEqualTo(1));
for (TaskInfo driver : drivers) {
assertTrue(driver.cancelled());
}
});
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
}
var resp = requestFuture.actionGet();
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();

assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), equalTo(true));
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER);

assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm confused. I thought cancelling an ES|QL would result in a top level failure, like a 500 status. Is that not right? It returns partial data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canceling the remote shouldn't - at least according to our slack conversation. Is it not correct?

assertThat(cluster.getFailures().size(), equalTo(1));
assertThat(cluster.getFailures().get(0).getCause(), instanceOf(TaskCancelledException.class));
}

// Check that closing remote node with skip_unavailable=true produces partial
public void testCloseSkipUnavailable() throws Exception {
// We are using delay() here because closing cluster while inside pause fields doesn't seem to produce clean closure
assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot());
createRemoteIndex(between(1000, 5000));
createLocalIndex(10);
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("""
FROM test*,cluster-a:test* METADATA _index
| EVAL cluster=MV_FIRST(SPLIT(_index, ":"))
| WHERE CASE(cluster == "cluster-a", delay(1ms), true)
| STATS total = sum(const) | LIMIT 1
""");
request.pragmas(randomPragmas());
var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request);
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
SimplePauseFieldPlugin.allowEmitting.countDown();
cluster(REMOTE_CLUSTER).close();
try (var resp = requestFuture.actionGet()) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), equalTo(true));

List<List<Object>> values = getValuesList(resp);
assertThat(values.get(0).size(), equalTo(1));
// We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there
assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(45L));

EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER);
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);

assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getSuccessfulShards(), equalTo(1));

assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(cluster.getSuccessfulShards(), equalTo(0));
assertThat(cluster.getFailures().size(), equalTo(1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -81,6 +84,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(InternalExchangePlugin.class);
plugins.add(FailingFieldPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -470,6 +474,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
Set<String> expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet());
assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases));

boolean hasSkipped = false;
for (ExpectedCluster expectedCluster : expected) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias());
String msg = cluster.getClusterAlias();
Expand All @@ -488,10 +493,12 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class));
String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg));
hasSkipped = true;
}
// currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
assertThat(msg, cluster.getFailedShards(), equalTo(0));
}
assertThat(executionInfo.isPartial(), equalTo(hasSkipped));
}

public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() {
Expand Down Expand Up @@ -537,6 +544,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() {
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -593,6 +601,7 @@ public void testCCSExecutionOnSearchesWithLimit0() {
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(false));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
Expand Down Expand Up @@ -641,6 +650,7 @@ public void testMetadataIndex() {
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.isPartial(), equalTo(false));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
Expand Down Expand Up @@ -836,6 +846,17 @@ public void testWarnings() throws Exception {
assertTrue(latch.await(30, TimeUnit.SECONDS));
}

// Non-disconnect remote failures still fail the request
public void testRemoteFailureSkipUnavailable() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change name to include "True", testRemoteFailureWithSkipUnavailableTrue. Otherwise you have to read a bit of code to figure out the setting.

Map<String, Object> testClusterInfo = setupFailClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remote1Index = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
assertThat(e.getMessage(), containsString("Accessing failing field"));
}

private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
try {
final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
Expand Down Expand Up @@ -1058,4 +1079,46 @@ private void clearSkipUnavailable() {
.setPersistentSettings(settingsBuilder.build())
.get();
}

Map<String, Object> setupFailClusters() throws IOException {
int numShardsLocal = randomIntBetween(1, 3);
populateLocalIndices(LOCAL_INDEX, numShardsLocal);

int numShardsRemote = randomIntBetween(1, 3);
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);

Map<String, Object> clusterInfo = new HashMap<>();
clusterInfo.put("local.num_shards", numShardsLocal);
clusterInfo.put("local.index", LOCAL_INDEX);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", REMOTE_INDEX);
setSkipUnavailable(REMOTE_CLUSTER_1, true);
return clusterInfo;
}

void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException {
Client remoteClient = client(clusterAlias);
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject("fail_me");
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject();
}
mapping.endObject();
}
mapping.endObject();
assertAcked(
remoteClient.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
.setMapping(mapping.endObject())
);

remoteClient.prepareIndex(indexName).setSource("id", 0).get();
remoteClient.admin().indices().prepareRefresh(indexName).get();
}

}
Loading