Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
318d424
Ignore failures on skip_unavailable
smalyshev Nov 6, 2024
378f71a
Cover more cases for skip
smalyshev Nov 7, 2024
dc8ec22
Fix existing tests by defaulting skip_un to false
smalyshev Nov 7, 2024
dc1e4fa
Add tests for both skip_un settings
smalyshev Nov 8, 2024
6c09cb4
Add handling remote sink failures
smalyshev Nov 12, 2024
2842278
Update docs/changelog/116365.yaml
smalyshev Nov 13, 2024
22c8bf7
Merge branch 'main' into skip-on-fail
smalyshev Nov 14, 2024
1135803
Enable runtime missing index tests
smalyshev Nov 14, 2024
a61ed17
More runtime missing index tests
smalyshev Nov 14, 2024
9a81820
Add cancellation/shutdown tests
smalyshev Nov 15, 2024
944df10
Merge branch 'main' into skip-on-fail
smalyshev Nov 19, 2024
9104b47
Merge branch 'main' into skip-on-fail
smalyshev Nov 20, 2024
7c25b35
Fix utils test
smalyshev Nov 20, 2024
2ddfd3f
Merge branch 'main' into skip-on-fail
smalyshev Nov 26, 2024
14b6a4c
Update for new exchange code
smalyshev Nov 26, 2024
82542f8
Fix build
smalyshev Nov 26, 2024
d2274f5
Rename method
smalyshev Nov 26, 2024
ad6dbc0
Merge branch 'main' into skip-on-fail
smalyshev Nov 26, 2024
177d2dc
Some more comments
smalyshev Nov 27, 2024
f46d365
Merge branch 'main' into skip-on-fail
smalyshev Nov 27, 2024
0128877
Post-sync updates
smalyshev Nov 27, 2024
5931658
Merge branch 'main' into skip-on-fail
smalyshev Nov 27, 2024
a431cf9
Add test for failure
smalyshev Nov 28, 2024
2a64e30
Update docs/changelog/116365.yaml
smalyshev Nov 28, 2024
778e8e4
Merge branch 'main' into skip-on-fail
smalyshev Dec 1, 2024
8a32dde
Organize listeners & add exchange failure handling
smalyshev Dec 2, 2024
6f035a2
Merge branch 'main' into skip-on-fail
smalyshev Dec 2, 2024
441b390
Test fixes
smalyshev Dec 2, 2024
1e73b14
Pull feedback
smalyshev Dec 3, 2024
a32de43
Merge branch 'main' into skip-on-fail
smalyshev Dec 3, 2024
5633936
Merge branch 'main' into skip-on-fail
smalyshev Dec 3, 2024
496c273
Merge branch 'main' into skip-on-fail
smalyshev Dec 5, 2024
c0a677b
Merge branch 'main' into skip-on-fail
smalyshev Dec 12, 2024
8a3a5ee
Merge branch 'main' into skip-on-fail
smalyshev Dec 12, 2024
cbae222
spotless
smalyshev Dec 12, 2024
ff16ebf
Merge branch 'main' into skip-on-fail
smalyshev Jan 14, 2025
1f1601b
fix test
smalyshev Jan 14, 2025
4c19802
Move test
smalyshev Jan 14, 2025
4f360ef
Fix test
smalyshev Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/116365.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116365
summary: Ignore remote ES|QL execution failures when skip_unavailable=true
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

public class FailingFieldPlugin extends Plugin implements ScriptPlugin {

public static final String FAILING_FIELD_LANG = "failing_field";

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "failing_field";
return FAILING_FIELD_LANG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() {
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
return localCluster(remoteCluster, true);
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
return ElasticsearchCluster.local()
.name(LOCAL_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
Expand All @@ -40,6 +44,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
.setting("xpack.license.self_generated.type", "trial")
.setting("node.roles", "[data,ingest,master,remote_cluster_client]")
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
.setting("cluster.remote.remote_cluster.skip_unavailable", skipUnavailable.toString())
.setting("cluster.remote.connections_per_cluster", "1")
.shared(true)
.setting("cluster.routing.rebalance.enable", "none")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class EsqlRestValidationIT extends EsqlRestValidationTestCase {
static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster);
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.ccq;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class EsqlRestValidationSkipUnavailableIT extends EsqlRestValidationIT {
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, true);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);

@Override
protected String getTestRestCluster() {
return localCluster.getHttpAddresses();
}

@Override
protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException {
assertValidRequestOnIndices(new String[] { indexName });
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase {

static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster);
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,30 @@ public void testInexistentIndexNameWithoutWildcard() throws IOException {

public void testExistentIndexWithoutWildcard() throws IOException {
for (String indexName : existentIndexWithoutWildcard) {
assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
}
}

public void testExistentIndexWithWildcard() throws IOException {
assertValidRequestOnIndices(existentIndexWithWildcard);
}

protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException {
assertErrorMessage(indexName, errorMessage, statusCode);
}

public void testAlias() throws IOException {
createAlias();

for (String indexName : existentAliasWithoutWildcard) {
assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
}
assertValidRequestOnIndices(existentAliasWithWildcard);

deleteAlias();
}

private void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException {
protected void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException {
for (String indexName : indices) {
assertErrorMessage(indexName, errorMessage + "[" + clusterSpecificIndexName(indexName) + "]", statusCode);
}
Expand All @@ -116,7 +120,7 @@ protected String clusterSpecificIndexName(String indexName) {
return indexName;
}

private void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException {
protected void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException {
var specificName = clusterSpecificIndexName(indexName);
final var request = createRequest(specificName);
ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request));
Expand All @@ -138,7 +142,7 @@ private Request createRequest(String indexName) throws IOException {
return request;
}

private void assertValidRequestOnIndices(String[] indices) throws IOException {
protected void assertValidRequestOnIndices(String[] indices) throws IOException {
for (String indexName : indices) {
final var request = createRequest(clusterSpecificIndexName(indexName));
Response response = client().performRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.esql.action.AbstractPauseFieldPlugin.PAUSE_FIELD_LANG;

/** A pausable testcase. Subclasses extend this testcase to simulate slow running queries.
*
* Uses the evaluation of a runtime field in the mappings "pause_me" of type long, along
Expand Down Expand Up @@ -64,7 +66,7 @@ public void setupIndex() throws IOException {
mapping.startObject("pause_me");
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", "pause").endObject();
mapping.startObject("script").field("source", "").field("lang", PAUSE_FIELD_LANG).endObject();
}
mapping.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
*/
public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin {

public static final String PAUSE_FIELD_LANG = "pause";

// Called when the engine enters the execute() method.
protected void onStartExecute() {}

Expand All @@ -39,7 +41,7 @@ public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<
return new ScriptEngine() {
@Override
public String getType() {
return "pause";
return PAUSE_FIELD_LANG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ void populateRemoteIndicesWithRuntimeMapping(String clusterAlias) throws IOExcep
mapping.startObject("const");
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", "pause").endObject();
mapping.startObject("script").field("source", "").field("lang", AbstractPauseFieldPlugin.PAUSE_FIELD_LANG).endObject();
}
mapping.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.Build;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.TransportService;
Expand All @@ -31,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
Expand All @@ -39,10 +42,16 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER = "cluster-a";

@Override
protected boolean reuseClusters() {
return false;
}

@Override
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
Expand All @@ -57,6 +66,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return plugins;
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
return Map.of("cluster-a", true, "cluster-b", false);
}

public static class InternalExchangePlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
Expand All @@ -82,7 +96,7 @@ private void createRemoteIndex(int numDocs) throws Exception {
mapping.startObject("const");
{
mapping.field("type", "long");
mapping.startObject("script").field("source", "").field("lang", "pause").endObject();
mapping.startObject("script").field("source", "").field("lang", AbstractPauseFieldPlugin.PAUSE_FIELD_LANG).endObject();
}
mapping.endObject();
}
Expand All @@ -96,6 +110,26 @@ private void createRemoteIndex(int numDocs) throws Exception {
bulk.get();
}

private void createLocalIndex(int numDocs) throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
mapping.startObject("const");
{
mapping.field("type", "long");
}
mapping.endObject();
}
mapping.endObject();
mapping.endObject();
client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get();
BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < numDocs; i++) {
bulk.add(new IndexRequest().source("const", i));
}
bulk.get();
}

public void testCancel() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
Expand Down Expand Up @@ -208,4 +242,92 @@ public void testTasks() throws Exception {
}
requestFuture.actionGet(30, TimeUnit.SECONDS).close();
}

// Check that cancelling remote task with skip_unavailable=true produces partial
public void testCancelSkipUnavailable() throws Exception {
createRemoteIndex(between(10, 100));
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
request.pragmas(randomPragmas());
request.includeCCSMetadata(true);
PlainActionFuture<EsqlQueryResponse> requestFuture = new PlainActionFuture<>();
client().execute(EsqlQueryAction.INSTANCE, request, requestFuture);
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
List<TaskInfo> rootTasks = new ArrayList<>();
assertBusy(() -> {
List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(ComputeService.CLUSTER_ACTION_NAME)
.get()
.getTasks();
assertThat(tasks, hasSize(1));
rootTasks.addAll(tasks);
});
var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed");
client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest);
try {
assertBusy(() -> {
List<TaskInfo> drivers = client(REMOTE_CLUSTER).admin()
.cluster()
.prepareListTasks()
.setActions(DriverTaskRunner.ACTION_NAME)
.get()
.getTasks();
assertThat(drivers.size(), greaterThanOrEqualTo(1));
for (TaskInfo driver : drivers) {
assertTrue(driver.cancelled());
}
});
} finally {
SimplePauseFieldPlugin.allowEmitting.countDown();
}
var resp = requestFuture.actionGet();
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();

assertNotNull(executionInfo);
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER);

assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(cluster.getFailures().size(), equalTo(1));
assertThat(cluster.getFailures().get(0).getCause(), instanceOf(TaskCancelledException.class));
}

// Check that closing remote node with skip_unavailable=true produces partial
public void testCloseSkipUnavailable() throws Exception {
assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot());
createRemoteIndex(between(1000, 5000));
createLocalIndex(100);
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query("""
FROM test*,cluster-a:test* METADATA _index
| EVAL cluster=MV_FIRST(SPLIT(_index, ":"))
| WHERE CASE(cluster == "cluster-a", delay(1ms), true)
| STATS total = sum(const) | LIMIT 1
""");
request.pragmas(randomPragmas());
var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request);
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
SimplePauseFieldPlugin.allowEmitting.countDown();
cluster(REMOTE_CLUSTER).close();
try (var resp = requestFuture.actionGet()) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);

List<List<Object>> values = getValuesList(resp);
assertThat(values.get(0).size(), equalTo(1));
// We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there
assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(4950L));

EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER);
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);

assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getSuccessfulShards(), equalTo(1));

assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(cluster.getSuccessfulShards(), equalTo(0));
assertThat(cluster.getFailures().size(), equalTo(1));
}
}
}
Loading