Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.ConnectTransportException;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

// TODO: Move this test to the Serverless repo once the IT framework is ready there.
public class EsqlCpsDoesNotUseSkipUnavailableIT extends AbstractCrossClusterTestCase {

public static class CpsPlugin extends Plugin implements ClusterPlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(CpsEnableSetting);
}
}

private static final Setting<String> CpsEnableSetting = Setting.simpleString(
"serverless.cross_project.enabled",
Setting.Property.NodeScope
);

@Override
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
// Setting skip_unavailable=false results in a fatal error when the linked cluster is not available.
return Map.of(REMOTE_CLUSTER_1, false);
}

public void testCpsShouldNotUseSkipUnavailable() throws Exception {
// Add some dummy data to prove we are communicating fine with the remote.
assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index"));
client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();

// Shut down the linked cluster we'd be targeting in the search.
try {
cluster(REMOTE_CLUSTER_1).close();
} catch (Exception e) {
throw new AssertionError(e);
}

/*
* Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster
* and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true
* should not result in a fatal error.
*/

EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM *,*:* | limit 10");
request.allowPartialResults(true);
try (EsqlQueryResponse response = runQuery(request)) {
assertThat(response.isPartial(), is(true));
EsqlExecutionInfo info = response.getExecutionInfo();
assertThat(info.getCluster(REMOTE_CLUSTER_1).getStatus(), is(EsqlExecutionInfo.Cluster.Status.SKIPPED));
}

request = new EsqlQueryRequest().query("FROM *,*:* | limit 10");
try (EsqlQueryResponse response = runQuery(request)) {
fail("a fatal error should be thrown since allow_partial_results=false");
} catch (Exception e) {
assertThat(e, instanceOf(ConnectTransportException.class));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected void doResolvePolicies(
}

final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();
final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
Expand Down Expand Up @@ -304,6 +304,7 @@ private void lookupPolicies(
Collection<String> remoteClusters,
boolean includeLocal,
Collection<UnresolvedPolicy> unresolvedPolicies,
EsqlExecutionInfo executionInfo,
ActionListener<Map<String, LookupResponse>> listener
) {
final Map<String, LookupResponse> lookupResponses = ConcurrentCollections.newConcurrentMap();
Expand All @@ -315,8 +316,9 @@ private void lookupPolicies(
// remote clusters
if (remotePolicies.isEmpty() == false) {
for (String cluster : remoteClusters) {
boolean skipOnFailure = executionInfo.shouldSkipOnFailure(cluster);
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() {
getRemoteConnection(cluster, skipOnFailure == false, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
transportService.sendRequest(
Expand All @@ -325,7 +327,7 @@ public void onResponse(Transport.Connection connection) {
new LookupRequest(cluster, remotePolicies),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)),
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, skipOnFailure, l)),
LookupResponse::new,
threadPool.executor(ThreadPool.Names.SEARCH)
)
Expand All @@ -334,7 +336,7 @@ public void onResponse(Transport.Connection connection) {

@Override
public void onFailure(Exception e) {
failIfSkipUnavailableFalse(e, cluster, lookupListener);
failIfSkipUnavailableFalse(e, skipOnFailure, lookupListener);
}
});
}
Expand All @@ -359,8 +361,8 @@ public void onFailure(Exception e) {
}
}

private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener<LookupResponse> lookupListener) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster).orElse(true)) {
private void failIfSkipUnavailableFalse(Exception e, boolean skipOnFailure, ActionListener<LookupResponse> lookupListener) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && skipOnFailure) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
Expand Down Expand Up @@ -480,11 +482,7 @@ protected Map<String, EnrichPolicy> availablePolicies() {
return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies();
}

protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {
remoteClusterService.maybeEnsureConnectedAndGetConnection(
cluster,
remoteClusterService.isSkipUnavailable(cluster).orElse(true) == false,
listener
);
protected void getRemoteConnection(String cluster, boolean ensureConnected, ActionListener<Transport.Connection> listener) {
remoteClusterService.maybeEnsureConnectedAndGetConnection(cluster, ensureConnected, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,11 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
// include_ccs_metadata is considered only if include_execution_metadata is not set
includeCcsMetadata = Boolean.TRUE.equals(request.includeCCSMetadata());
}
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true), includeCcsMetadata);
Boolean allowPartialResults = request.allowPartialResults() != null ? request.allowPartialResults() : defaultAllowPartialResults;
return new EsqlExecutionInfo(
clusterAlias -> remoteClusterService.shouldSkipOnFailure(clusterAlias, allowPartialResults),
includeCcsMetadata
);
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
}

@Override
protected void getRemoteConnection(String remoteCluster, ActionListener<Transport.Connection> listener) {
protected void getRemoteConnection(String remoteCluster, boolean ensureConnected, ActionListener<Transport.Connection> listener) {
assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY));
listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()));
}
Expand Down