Skip to content

Commit bae9192

Browse files
smalyshevelasticsearchmachine
andauthored
[8.x] Implement runtime skip_unavailable=true (elastic#121240) (elastic#121602)
* Implement runtime skip_unavailable=true (elastic#121240) * Implement runtime skip_unavailable=true (cherry picked from commit 2fbec77) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 3d3224e commit bae9192

File tree

19 files changed

+678
-172
lines changed

19 files changed

+678
-172
lines changed

docs/changelog/121240.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121240
2+
summary: Implement runtime skip_unavailable=true
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424

2525
public class FailingFieldPlugin extends Plugin implements ScriptPlugin {
2626

27+
public static final String FAILING_FIELD_LANG = "failing_field";
28+
2729
@Override
2830
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
2931
return new ScriptEngine() {
3032
@Override
3133
public String getType() {
32-
return "failing_field";
34+
return FAILING_FIELD_LANG;
3335
}
3436

3537
@Override

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() {
3131
}
3232

3333
public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
34+
return localCluster(remoteCluster, true);
35+
}
36+
37+
public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
3438
return ElasticsearchCluster.local()
3539
.name(LOCAL_CLUSTER_NAME)
3640
.distribution(DistributionType.DEFAULT)
@@ -41,6 +45,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
4145
.setting("node.roles", "[data,ingest,master,remote_cluster_client]")
4246
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
4347
.setting("cluster.remote.connections_per_cluster", "1")
48+
.setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString())
4449
.shared(true)
4550
.setting("cluster.routing.rebalance.enable", "none")
4651
.build();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.ccq;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.test.TestClustersThreadFilter;
13+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
14+
import org.junit.ClassRule;
15+
import org.junit.rules.RuleChain;
16+
import org.junit.rules.TestRule;
17+
18+
// Duplicate of EsqlRestValidationIT test where skip_unavailable is set to false
19+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
20+
public class EsqlRestValidationSkipUnFalseIT extends EsqlRestValidationIT {
21+
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);
22+
23+
@ClassRule
24+
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
25+
26+
@Override
27+
protected String getTestRestCluster() {
28+
return localCluster.getHttpAddresses();
29+
}
30+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.core.TimeValue;
1919
import org.elasticsearch.plugins.Plugin;
2020
import org.elasticsearch.test.AbstractMultiClustersTestCase;
21+
import org.elasticsearch.test.FailingFieldPlugin;
2122
import org.elasticsearch.test.XContentTestUtils;
2223
import org.elasticsearch.transport.RemoteClusterAware;
2324
import org.elasticsearch.xcontent.XContentBuilder;
@@ -63,6 +64,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
6364
plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class);
6465
plugins.add(SimplePauseFieldPlugin.class);
6566
plugins.add(FailingPauseFieldPlugin.class);
67+
plugins.add(FailingFieldPlugin.class);
6668
plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class);
6769
return plugins;
6870
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,11 +510,17 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf
510510
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
511511
assertTrue(executionInfo.isCrossClusterSearch());
512512

513+
boolean hasPartials = false;
513514
for (String clusterAlias : executionInfo.clusterAliases()) {
514515
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
515516
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
516517
assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis()));
518+
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
519+
|| cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
520+
hasPartials = true;
521+
}
517522
}
523+
assertThat(executionInfo.isPartial(), equalTo(hasPartials));
518524
}
519525

520526
private void setSkipUnavailable(String clusterAlias, boolean skip) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import org.elasticsearch.index.IndexNotFoundException;
2424
import org.elasticsearch.index.query.QueryBuilder;
2525
import org.elasticsearch.index.query.TermsQueryBuilder;
26+
import org.elasticsearch.test.FailingFieldPlugin;
2627
import org.elasticsearch.test.InternalTestCluster;
2728
import org.elasticsearch.test.XContentTestUtils;
2829
import org.elasticsearch.transport.TransportService;
30+
import org.elasticsearch.xcontent.XContentBuilder;
31+
import org.elasticsearch.xcontent.json.JsonXContent;
2932
import org.elasticsearch.xpack.esql.VerificationException;
3033
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
3134

@@ -433,6 +436,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
433436
Set<String> expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet());
434437
assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases));
435438

439+
boolean hasSkipped = false;
436440
for (ExpectedCluster expectedCluster : expected) {
437441
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias());
438442
String msg = cluster.getClusterAlias();
@@ -451,10 +455,12 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
451455
assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class));
452456
String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
453457
assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg));
458+
hasSkipped = true;
454459
}
455460
// currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
456461
assertThat(msg, cluster.getFailedShards(), equalTo(0));
457462
}
463+
assertThat(executionInfo.isPartial(), equalTo(hasSkipped));
458464
}
459465

460466
public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws Exception {
@@ -500,6 +506,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws
500506
assertThat(executionInfo.isCrossClusterSearch(), is(true));
501507
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
502508
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
509+
assertThat(executionInfo.isPartial(), equalTo(true));
503510

504511
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
505512

@@ -556,6 +563,7 @@ public void testCCSExecutionOnSearchesWithLimit0() throws Exception {
556563
long overallTookMillis = executionInfo.overallTook().millis();
557564
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
558565
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
566+
assertThat(executionInfo.isPartial(), equalTo(false));
559567
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
560568

561569
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
@@ -604,6 +612,7 @@ public void testMetadataIndex() throws Exception {
604612
assertThat(executionInfo.isCrossClusterSearch(), is(true));
605613
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
606614
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
615+
assertThat(executionInfo.isPartial(), equalTo(false));
607616

608617
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
609618
assertThat(remoteCluster.getIndexExpression(), equalTo("logs*"));
@@ -799,6 +808,17 @@ public void testWarnings() throws Exception {
799808
assertTrue(latch.await(30, TimeUnit.SECONDS));
800809
}
801810

811+
// Non-disconnect remote failures still fail the request even if skip_unavailable is true
812+
public void testRemoteFailureSkipUnavailableTrue() throws IOException {
813+
Map<String, Object> testClusterInfo = setupFailClusters();
814+
String localIndex = (String) testClusterInfo.get("local.index");
815+
String remote1Index = (String) testClusterInfo.get("remote.index");
816+
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
817+
String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
818+
IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
819+
assertThat(e.getMessage(), containsString("Accessing failing field"));
820+
}
821+
802822
private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {
803823
try {
804824
final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
@@ -925,4 +945,46 @@ Map<String, String> createEmptyIndicesWithNoMappings(int numClusters) {
925945

926946
return clusterToEmptyIndexMap;
927947
}
948+
949+
Map<String, Object> setupFailClusters() throws IOException {
950+
int numShardsLocal = randomIntBetween(1, 3);
951+
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
952+
953+
int numShardsRemote = randomIntBetween(1, 3);
954+
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
955+
956+
Map<String, Object> clusterInfo = new HashMap<>();
957+
clusterInfo.put("local.num_shards", numShardsLocal);
958+
clusterInfo.put("local.index", LOCAL_INDEX);
959+
clusterInfo.put("remote.num_shards", numShardsRemote);
960+
clusterInfo.put("remote.index", REMOTE_INDEX);
961+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
962+
return clusterInfo;
963+
}
964+
965+
void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException {
966+
Client remoteClient = client(clusterAlias);
967+
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
968+
mapping.startObject("runtime");
969+
{
970+
mapping.startObject("fail_me");
971+
{
972+
mapping.field("type", "long");
973+
mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject();
974+
}
975+
mapping.endObject();
976+
}
977+
mapping.endObject();
978+
assertAcked(
979+
remoteClient.admin()
980+
.indices()
981+
.prepareCreate(indexName)
982+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
983+
.setMapping(mapping.endObject())
984+
);
985+
986+
remoteClient.prepareIndex(indexName).setSource("id", 0).get();
987+
remoteClient.admin().indices().prepareRefresh(indexName).get();
988+
}
989+
928990
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
5757
long overallTookMillis = executionInfo.overallTook().millis();
5858
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
5959
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
60+
assertThat(executionInfo.isPartial(), equalTo(true));
6061

6162
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
6263

@@ -109,6 +110,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
109110
long overallTookMillis = executionInfo.overallTook().millis();
110111
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
111112
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
113+
assertThat(executionInfo.isPartial(), equalTo(true));
112114

113115
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
114116

@@ -161,6 +163,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc
161163
long overallTookMillis = executionInfo.overallTook().millis();
162164
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
163165
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
166+
assertThat(executionInfo.isPartial(), equalTo(true));
164167

165168
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER)));
166169

@@ -233,6 +236,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
233236
long overallTookMillis = executionInfo.overallTook().millis();
234237
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
235238
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
239+
assertThat(executionInfo.isPartial(), equalTo(true));
236240

237241
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));
238242

@@ -275,6 +279,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue()
275279
long overallTookMillis = executionInfo.overallTook().millis();
276280
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
277281
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
282+
assertThat(executionInfo.isPartial(), equalTo(true));
278283

279284
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)));
280285

0 commit comments

Comments
 (0)