Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.search.ccs;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.hamcrest.Matchers;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;

// TODO: Move this test to the Serverless repo once the IT framework is ready there.
public class CpsDoesNotUseSkipUnavailableIT extends AbstractMultiClustersTestCase {
private static final String LINKED_CLUSTER_1 = "cluster-a";

public static class CpsPlugin extends Plugin implements ClusterPlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(CpsEnableSetting);
}
}

private static final Setting<String> CpsEnableSetting = Setting.simpleString(
"serverless.cross_project.enabled",
Setting.Property.NodeScope
);

@Override
protected List<String> remoteClusterAlias() {
return List.of(LINKED_CLUSTER_1);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
// Setting skip_unavailable=false results in a fatal error when the linked cluster is not available.
return Map.of(LINKED_CLUSTER_1, false);
}

public void testCpsShouldNotUseSkipUnavailable() throws Exception {
// Add some dummy data to prove we are communicating fine with the remote.
assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index"));
client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();

// Shut down the linked cluster we'd be targeting in the search.
try {
cluster(LINKED_CLUSTER_1).close();
} catch (Exception e) {
throw new AssertionError(e);
}

/*
* Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster
* and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true
* should not result in a fatal error.
*/
{
var searchRequest = getSearchRequest(true);
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
assertResponse(client().execute(TransportSearchAction.TYPE, searchRequest), result -> {
var originCluster = result.getClusters().getCluster(LOCAL_CLUSTER);
assertThat(originCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SUCCESSFUL));

var linkedCluster = result.getClusters().getCluster(LINKED_CLUSTER_1);
assertThat(linkedCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SKIPPED));

var linkedClusterFailures = result.getClusters().getCluster(LINKED_CLUSTER_1).getFailures();
assertThat(linkedClusterFailures.size(), Matchers.is(1));
// Failure is something along the lines of shard failure and is caused by a connection error.
assertThat(
linkedClusterFailures.getFirst().getCause(),
Matchers.anyOf(
Matchers.instanceOf(RemoteTransportException.class),
Matchers.instanceOf(ConnectTransportException.class)
)
);
});
}

/*
* Previously, we did not get a fatal error even when skip_unavailable=false for the linked cluster.
* Now, we disable partial results and expect a fatal error. This proves that in CPS environment,
* search uses allow_partial_search_results and not skip_unavailable.
*/
{
var searchRequest = getSearchRequest(false);
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
var ae = expectThrows(AssertionError.class, () -> safeGet(client().execute(TransportSearchAction.TYPE, searchRequest)));
assertThat(ae.getCause(), Matchers.instanceOf(ExecutionException.class));
assertThat(
ae.getCause().getCause(),
Matchers.anyOf(Matchers.instanceOf(RemoteTransportException.class), Matchers.instanceOf(ConnectTransportException.class))
);
}
}

private SearchRequest getSearchRequest(boolean allowPartialResults) {
// Include both origin and linked cluster in the search op.
var searchRequest = new SearchRequest("*", "*:*");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just *:* or even cluster-a:*? I understand only the remote part is being tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no specific reason. It's just an assurance that the search proceeds fine -- local cluster is marked as successful and the other is either skipped or there's a fatal error.

searchRequest.allowPartialSearchResults(allowPartialResults);
return searchRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
resolveClusterTask.ensureNotCancelled();
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true);
RemoteClusterClient remoteClusterClient = remoteClusterService.getRemoteClusterClient(
clusterAlias,
searchCoordinationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private void doExecuteForked(
);

boolean ensureConnected = forceConnectTimeoutSecs != null
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias).orElse(true) == false;
transportService.getRemoteClusterService()
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,16 @@ public static final class Clusters implements ToXContentFragment, Writeable {
* @param localIndices The localIndices to be searched - null if no local indices are to be searched
* @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster
* @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS
* @param skipUnavailablePredicate given a cluster alias, returns true if that cluster is skip_unavailable=true
* and false otherwise
* @param skipOnFailurePredicate given a cluster alias, returns true if that cluster is marked as skippable
* and false otherwise. For a cluster to be considered as skippable, either
* we should be in CPS environment and allow_partial_results=true, or,
* skip_unavailable=true.
*/
public Clusters(
@Nullable OriginalIndices localIndices,
Map<String, OriginalIndices> remoteClusterIndices,
boolean ccsMinimizeRoundtrips,
Predicate<String> skipUnavailablePredicate
Predicate<String> skipOnFailurePredicate
) {
assert remoteClusterIndices.size() > 0 : "At least one remote cluster must be passed into this Cluster constructor";
this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1);
Expand All @@ -531,8 +533,8 @@ public Clusters(
}
for (Map.Entry<String, OriginalIndices> remote : remoteClusterIndices.entrySet()) {
String clusterAlias = remote.getKey();
boolean skipUnavailable = skipUnavailablePredicate.test(clusterAlias);
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipUnavailable);
boolean skipOnFailure = skipOnFailurePredicate.test(clusterAlias);
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipOnFailure);
m.put(clusterAlias, c);
}
this.clusterInfo = m;
Expand Down
Loading