Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121240.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121240
summary: Implement runtime skip_unavailable=true
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

public class FailingFieldPlugin extends Plugin implements ScriptPlugin {

public static final String FAILING_FIELD_LANG = "failing_field";

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "failing_field";
return FAILING_FIELD_LANG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() {
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
return localCluster(remoteCluster, true);
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
return ElasticsearchCluster.local()
.name(LOCAL_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
Expand All @@ -41,6 +45,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
.setting("node.roles", "[data,ingest,master,remote_cluster_client]")
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
.setting("cluster.remote.connections_per_cluster", "1")
.setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString())
.shared(true)
.setting("cluster.routing.rebalance.enable", "none")
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.ccq;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

// Duplicate of EsqlRestValidationIT test where skip_unavailable is set to false
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class EsqlRestValidationSkipUnFalseIT extends EsqlRestValidationIT {
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);

@Override
protected String getTestRestCluster() {
return localCluster.getHttpAddresses();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,17 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertTrue(executionInfo.isCrossClusterSearch());

boolean hasPartials = false;
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis()));
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
|| cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
hasPartials = true;
}
}
assertThat(executionInfo.isPartial(), equalTo(hasPartials));
}

private void setSkipUnavailable(String clusterAlias, boolean skip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -151,6 +152,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -203,6 +205,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -275,6 +278,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));

Expand Down Expand Up @@ -317,6 +321,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.Build;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
Expand Down Expand Up @@ -75,6 +76,11 @@ public void resetPlugin() {
SimplePauseFieldPlugin.resetPlugin();
}

@Override
protected boolean reuseClusters() {
return false;
}

private void createRemoteIndex(int numDocs) throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
Expand All @@ -96,6 +102,26 @@ private void createRemoteIndex(int numDocs) throws Exception {
bulk.get();
}

private void createLocalIndex(int numDocs) throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject("const");
{
mapping.field("type", "long");
}
mapping.endObject();
}
mapping.endObject();
mapping.endObject();
client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get();
BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < numDocs; i++) {
bulk.add(new IndexRequest().source("const", i));
}
bulk.get();
}

public void testCancel() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
Expand Down Expand Up @@ -208,4 +234,88 @@ public void testTasks() throws Exception {
}
requestFuture.actionGet(30, TimeUnit.SECONDS).close();
}

// Check that cancelling remote task with skip_unavailable=true produces failure
public void testCancelSkipUnavailable() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
request.pragmas(randomPragmas());
request.includeCCSMetadata(true);
PlainActionFuture<EsqlQueryResponse> requestFuture = new PlainActionFuture<>();
client().execute(EsqlQueryAction.INSTANCE, request, requestFuture);
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
List<TaskInfo> rootTasks = new ArrayList<>();
assertBusy(() -> {
List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(ComputeService.CLUSTER_ACTION_NAME)
.get()
.getTasks();
assertThat(tasks, hasSize(1));
rootTasks.addAll(tasks);
});
var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed");
client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest);
try {
assertBusy(() -> {
List<TaskInfo> drivers = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(DriverTaskRunner.ACTION_NAME)
.get()
.getTasks();
assertThat(drivers.size(), greaterThanOrEqualTo(1));
for (TaskInfo driver : drivers) {
assertTrue(driver.cancelled());
}
});
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
}

Exception error = expectThrows(Exception.class, requestFuture::actionGet);
assertThat(error.getMessage(), containsString("remote failed"));
}

// Check that closing remote node with skip_unavailable=true produces partial
public void testCloseSkipUnavailable() throws Exception {
// We are using delay() here because closing cluster while inside pause fields doesn't seem to produce clean closure
assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot());
createRemoteIndex(between(1000, 5000));
createLocalIndex(10);
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("""
FROM test*,cluster-a:test* METADATA _index
| EVAL cluster=MV_FIRST(SPLIT(_index, ":"))
| WHERE CASE(cluster == "cluster-a", delay(1ms), true)
| STATS total = sum(const) | LIMIT 1
""");
request.pragmas(randomPragmas());
var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request);
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
SimplePauseFieldPlugin.allowEmitting.countDown();
cluster(REMOTE_CLUSTER).close();
try (var resp = requestFuture.actionGet()) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isPartial(), equalTo(true));

List<List<Object>> values = getValuesList(resp);
assertThat(values.get(0).size(), equalTo(1));
// We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there
assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(45L));

EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER);
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);

assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getSuccessfulShards(), equalTo(1));

assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(cluster.getSuccessfulShards(), equalTo(0));
assertThat(cluster.getFailures().size(), equalTo(1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

Expand Down Expand Up @@ -81,6 +84,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(InternalExchangePlugin.class);
plugins.add(FailingFieldPlugin.class);
return plugins;
}

Expand Down Expand Up @@ -470,6 +474,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
Set<String> expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet());
assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases));

boolean hasSkipped = false;
for (ExpectedCluster expectedCluster : expected) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias());
String msg = cluster.getClusterAlias();
Expand All @@ -488,10 +493,12 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class));
String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg));
hasSkipped = true;
}
// currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
assertThat(msg, cluster.getFailedShards(), equalTo(0));
}
assertThat(executionInfo.isPartial(), equalTo(hasSkipped));
}

public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() {
Expand Down Expand Up @@ -537,6 +544,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() {
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(true));

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));

Expand Down Expand Up @@ -593,6 +601,7 @@ public void testCCSExecutionOnSearchesWithLimit0() {
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.isPartial(), equalTo(false));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
Expand Down Expand Up @@ -641,6 +650,7 @@ public void testMetadataIndex() {
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
assertThat(executionInfo.isPartial(), equalTo(false));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
Expand Down Expand Up @@ -836,6 +846,17 @@ public void testWarnings() throws Exception {
assertTrue(latch.await(30, TimeUnit.SECONDS));
}

// Non-disconnect remote failures still fail the request even if skip_unavailable is true
public void testRemoteFailureSkipUnavailableTrue() throws IOException {
Map<String, Object> testClusterInfo = setupFailClusters();
String localIndex = (String) testClusterInfo.get("local.index");
String remote1Index = (String) testClusterInfo.get("remote.index");
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
assertThat(e.getMessage(), containsString("Accessing failing field"));
}

private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
try {
final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
Expand Down Expand Up @@ -1058,4 +1079,46 @@ private void clearSkipUnavailable() {
.setPersistentSettings(settingsBuilder.build())
.get();
}

Map<String, Object> setupFailClusters() throws IOException {
int numShardsLocal = randomIntBetween(1, 3);
populateLocalIndices(LOCAL_INDEX, numShardsLocal);

int numShardsRemote = randomIntBetween(1, 3);
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);

Map<String, Object> clusterInfo = new HashMap<>();
clusterInfo.put("local.num_shards", numShardsLocal);
clusterInfo.put("local.index", LOCAL_INDEX);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", REMOTE_INDEX);
setSkipUnavailable(REMOTE_CLUSTER_1, true);
return clusterInfo;
}

void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException {
Client remoteClient = client(clusterAlias);
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject("fail_me");
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject();
}
mapping.endObject();
}
mapping.endObject();
assertAcked(
remoteClient.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
.setMapping(mapping.endObject())
);

remoteClient.prepareIndex(indexName).setSource("id", 0).get();
remoteClient.admin().indices().prepareRefresh(indexName).get();
}

}
Loading