Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
318d424
Ignore failures on skip_unavailable
smalyshev Nov 6, 2024
378f71a
Cover more cases for skip
smalyshev Nov 7, 2024
dc8ec22
Fix existing tests by defaulting skip_un to false
smalyshev Nov 7, 2024
dc1e4fa
Add tests for both skip_un settings
smalyshev Nov 8, 2024
6c09cb4
Add handling remote sink failures
smalyshev Nov 12, 2024
2842278
Update docs/changelog/116365.yaml
smalyshev Nov 13, 2024
22c8bf7
Merge branch 'main' into skip-on-fail
smalyshev Nov 14, 2024
1135803
Enable runtime missing index tests
smalyshev Nov 14, 2024
a61ed17
More runtime missing index tests
smalyshev Nov 14, 2024
9a81820
Add cancellation/shutdown tests
smalyshev Nov 15, 2024
944df10
Merge branch 'main' into skip-on-fail
smalyshev Nov 19, 2024
9104b47
Merge branch 'main' into skip-on-fail
smalyshev Nov 20, 2024
7c25b35
Fix utils test
smalyshev Nov 20, 2024
2ddfd3f
Merge branch 'main' into skip-on-fail
smalyshev Nov 26, 2024
14b6a4c
Update for new exchange code
smalyshev Nov 26, 2024
82542f8
Fix build
smalyshev Nov 26, 2024
d2274f5
Rename method
smalyshev Nov 26, 2024
ad6dbc0
Merge branch 'main' into skip-on-fail
smalyshev Nov 26, 2024
177d2dc
Some more comments
smalyshev Nov 27, 2024
f46d365
Merge branch 'main' into skip-on-fail
smalyshev Nov 27, 2024
0128877
Post-sync updates
smalyshev Nov 27, 2024
5931658
Merge branch 'main' into skip-on-fail
smalyshev Nov 27, 2024
a431cf9
Add test for failure
smalyshev Nov 28, 2024
2a64e30
Update docs/changelog/116365.yaml
smalyshev Nov 28, 2024
778e8e4
Merge branch 'main' into skip-on-fail
smalyshev Dec 1, 2024
8a32dde
Organize listeners & add exchange failure handling
smalyshev Dec 2, 2024
6f035a2
Merge branch 'main' into skip-on-fail
smalyshev Dec 2, 2024
441b390
Test fixes
smalyshev Dec 2, 2024
1e73b14
Pull feedback
smalyshev Dec 3, 2024
a32de43
Merge branch 'main' into skip-on-fail
smalyshev Dec 3, 2024
5633936
Merge branch 'main' into skip-on-fail
smalyshev Dec 3, 2024
496c273
Merge branch 'main' into skip-on-fail
smalyshev Dec 5, 2024
c0a677b
Merge branch 'main' into skip-on-fail
smalyshev Dec 12, 2024
8a3a5ee
Merge branch 'main' into skip-on-fail
smalyshev Dec 12, 2024
cbae222
spotless
smalyshev Dec 12, 2024
ff16ebf
Merge branch 'main' into skip-on-fail
smalyshev Jan 14, 2025
1f1601b
fix test
smalyshev Jan 14, 2025
4c19802
Move test
smalyshev Jan 14, 2025
4f360ef
Fix test
smalyshev Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/116365.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116365
summary: Ignore remote ES|QL execution failures when skip_unavailable=true
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/**
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
Expand Down Expand Up @@ -277,9 +278,16 @@ protected void doRun() {
* @param exchangeId the exchange ID
* @param transportService the transport service
* @param conn the connection to the remote node where the remote exchange sink is located
* @param failureCollector if not null, the failures will be sent to this consumer and the sink will be marked as finished
*/
public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn) {
return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor);
public RemoteSink newRemoteSink(
Task parentTask,
String exchangeId,
TransportService transportService,
Transport.Connection conn,
Consumer<Exception> failureCollector
) {
return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor, failureCollector);
}

static final class TransportRemoteSink implements RemoteSink {
Expand All @@ -289,6 +297,7 @@ static final class TransportRemoteSink implements RemoteSink {
final Task parentTask;
final String exchangeId;
final Executor responseExecutor;
final Consumer<Exception> failureCollector;

final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);

Expand All @@ -298,14 +307,16 @@ static final class TransportRemoteSink implements RemoteSink {
Transport.Connection connection,
Task parentTask,
String exchangeId,
Executor responseExecutor
Executor responseExecutor,
Consumer<Exception> failureCollector
) {
this.transportService = transportService;
this.blockFactory = blockFactory;
this.connection = connection;
this.parentTask = parentTask;
this.exchangeId = exchangeId;
this.responseExecutor = responseExecutor;
this.failureCollector = failureCollector;
}

@Override
Expand All @@ -316,6 +327,12 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeRe
blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes));
}
if (failureCollector != null) {
listener = listener.delegateResponse((l, ex) -> {
failureCollector.accept(ex);
l.onResponse(new ExchangeResponse(blockFactory, null, true));
});
}
transportService.sendChildRequest(
connection,
EXCHANGE_ACTION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void testConcurrentWithTransportActions() {
sourceHandler.addCompletionListener(sourceCompletionFuture);
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomExchangeBuffer());
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection), randomIntBetween(1, 5));
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection, null), randomIntBetween(1, 5));
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
runConcurrentTest(maxInputSeqNo, maxOutputSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink);
Expand Down Expand Up @@ -442,7 +442,7 @@ public void sendResponse(TransportResponse transportResponse) {
sourceHandler.addCompletionListener(sourceCompletionFuture);
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128));
Transport.Connection connection = node0.getConnection(node1.getLocalNode());
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection), randomIntBetween(1, 5));
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection, null), randomIntBetween(1, 5));
Exception err = expectThrows(
Exception.class,
() -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() {
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) {
return localCluster(remoteCluster, true);
}

public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) {
return ElasticsearchCluster.local()
.name(LOCAL_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
Expand All @@ -40,6 +44,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
.setting("xpack.license.self_generated.type", "trial")
.setting("node.roles", "[data,ingest,master,remote_cluster_client]")
.setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"")
.setting("cluster.remote.remote_cluster.skip_unavailable", skipUnavailable.toString())
.setting("cluster.remote.connections_per_cluster", "1")
.shared(true)
.setting("cluster.routing.rebalance.enable", "none")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class EsqlRestValidationIT extends EsqlRestValidationTestCase {
static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster);
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.ccq;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class EsqlRestValidationSkipUnavailableIT extends EsqlRestValidationIT {
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, true);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);

@Override
protected String getTestRestCluster() {
return localCluster.getHttpAddresses();
}

@Override
protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException {
assertValidRequestOnIndices(new String[] { indexName });
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,30 @@ public void testInexistentIndexNameWithoutWildcard() throws IOException {

public void testExistentIndexWithoutWildcard() throws IOException {
for (String indexName : existentIndexWithoutWildcard) {
assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
}
}

public void testExistentIndexWithWildcard() throws IOException {
assertValidRequestOnIndices(existentIndexWithWildcard);
}

protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException {
assertErrorMessage(indexName, errorMessage, statusCode);
}

public void testAlias() throws IOException {
createAlias();

for (String indexName : existentAliasWithoutWildcard) {
assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404);
}
assertValidRequestOnIndices(existentAliasWithWildcard);

deleteAlias();
}

private void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException {
protected void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException {
for (String indexName : indices) {
assertErrorMessage(indexName, errorMessage + "[" + clusterSpecificIndexName(indexName) + "]", statusCode);
}
Expand All @@ -116,7 +120,7 @@ protected String clusterSpecificIndexName(String indexName) {
return indexName;
}

private void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException {
protected void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException {
var specificName = clusterSpecificIndexName(indexName);
final var request = createRequest(specificName);
ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request));
Expand All @@ -138,7 +142,7 @@ private Request createRequest(String indexName) throws IOException {
return request;
}

private void assertValidRequestOnIndices(String[] indices) throws IOException {
protected void assertValidRequestOnIndices(String[] indices) throws IOException {
for (String indexName : indices) {
final var request = createRequest(clusterSpecificIndexName(indexName));
Response response = client().performRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,34 +506,47 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() {

// since cluster-a is skip_unavailable=true and at least one cluster has a matching indices, no error is thrown
{
// TODO solve in follow-on PR which does skip_unavailable handling at execution time
// String q = Strings.format("FROM %s,cluster-a:nomatch,cluster-a:%s*", localIndex, remote1Index);
// try (EsqlQueryResponse resp = runQuery(q, requestIncludeMeta)) {
// assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1));
// EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
// assertThat(executionInfo.isCrossClusterSearch(), is(true));
// assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
// assertExpectedClustersForMissingIndicesTests(executionInfo, List.of(
// // local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched
// new ExpectedCluster(REMOTE_CLUSTER_1, "nomatch", EsqlExecutionInfo.Cluster.Status.SKIPPED, 0),
// new ExpectedCluster(REMOTE_CLUSTER_1, "*", EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, remote2NumShards)
// ));
// }

// TODO: handle LIMIT 0 for this case in follow-on PR
// String limit0 = q + " | LIMIT 0";
// try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) {
// assertThat(resp.columns().size(), greaterThanOrEqualTo(1));
// assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(0));
// EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
// assertThat(executionInfo.isCrossClusterSearch(), is(true));
// assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
// assertExpectedClustersForMissingIndicesTests(executionInfo, List.of(
// // local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched
// new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0),
// new ExpectedCluster(REMOTE_CLUSTER_1, "nomatch," + remote1Index + "*", EsqlExecutionInfo.Cluster.Status.SKIPPED, 0)
// ));
// }
String q = Strings.format("FROM %s,cluster-a:nomatch,cluster-a:%s*", localIndex, remote1Index);
try (EsqlQueryResponse resp = runQuery(q, requestIncludeMeta)) {
assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertExpectedClustersForMissingIndicesTests(
executionInfo,
List.of(
new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, localNumShards),
new ExpectedCluster(
REMOTE_CLUSTER_1,
"nomatch," + remote1Index + "*",
EsqlExecutionInfo.Cluster.Status.PARTIAL,
0
)
)
);
}

String limit0 = q + " | LIMIT 0";
try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) {
assertThat(resp.columns().size(), greaterThanOrEqualTo(1));
assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(0));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(true));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertExpectedClustersForMissingIndicesTests(
executionInfo,
List.of(
new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0),
new ExpectedCluster(
REMOTE_CLUSTER_1,
"nomatch," + remote1Index + "*",
// TODO: this probably should be PARTIAL instead
EsqlExecutionInfo.Cluster.Status.SUCCESSFUL,
0
)
)
);
}
}

// tests with three clusters ---
Expand Down Expand Up @@ -844,12 +857,15 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));

Set<String> expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet());
Set<String> expectedClusterAliases = expected.stream().map(ExpectedCluster::clusterAlias).collect(Collectors.toSet());
assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases));

for (ExpectedCluster expectedCluster : expected) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias());
String msg = cluster.getClusterAlias();
if (msg.equals(LOCAL_CLUSTER)) {
msg = "(local)";
}
assertThat(msg, cluster.getIndexExpression(), equalTo(expectedCluster.indexExpression()));
assertThat(msg, cluster.getStatus(), equalTo(expectedCluster.status()));
assertThat(msg, cluster.getTook().millis(), greaterThanOrEqualTo(0L));
Expand All @@ -865,6 +881,10 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu
assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class));
String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]";
assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg));
} else if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) {
assertThat(msg, cluster.getSuccessfulShards(), equalTo(0));
assertThat(msg, cluster.getSkippedShards(), equalTo(expectedCluster.totalShards()));
assertThat(msg, cluster.getFailures().size(), equalTo(1));
}
// currently failed shards is always zero - change this once we start allowing partial data for individual shard failures
assertThat(msg, cluster.getFailedShards(), equalTo(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ public void markEndQuery() {
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
}

/**
* How much time the query took since starting.
*/
public TimeValue tookSoFar() {
if (relativeStartNanos == null) {
return new TimeValue(0);
} else {
return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
}
}

// for testing only - use markEndQuery in production code
void overallTook(TimeValue took) {
this.overallTook = took;
Expand Down
Loading