Skip to content

Commit c0160ed

Browse files
CPS search should not use skip_unavailable (#132927)
CPS S2D4: `skip_unavailable` should not be used to determine if an error should/shouldn't be fatal for _search. Instead, we must now rely on `allow_partial_search_results`.
1 parent 84443ee commit c0160ed

File tree

9 files changed

+219
-50
lines changed

9 files changed

+219
-50
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.ccs;
11+
12+
import org.elasticsearch.action.search.SearchRequest;
13+
import org.elasticsearch.action.search.SearchResponse;
14+
import org.elasticsearch.action.search.TransportSearchAction;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.CollectionUtils;
18+
import org.elasticsearch.plugins.ClusterPlugin;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
21+
import org.elasticsearch.transport.ConnectTransportException;
22+
import org.elasticsearch.transport.RemoteTransportException;
23+
import org.hamcrest.Matchers;
24+
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.ExecutionException;
29+
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
31+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
32+
33+
// TODO: Move this test to the Serverless repo once the IT framework is ready there.
34+
public class CpsDoesNotUseSkipUnavailableIT extends AbstractMultiClustersTestCase {
35+
private static final String LINKED_CLUSTER_1 = "cluster-a";
36+
37+
public static class CpsPlugin extends Plugin implements ClusterPlugin {
38+
@Override
39+
public List<Setting<?>> getSettings() {
40+
return List.of(CpsEnableSetting);
41+
}
42+
}
43+
44+
private static final Setting<String> CpsEnableSetting = Setting.simpleString(
45+
"serverless.cross_project.enabled",
46+
Setting.Property.NodeScope
47+
);
48+
49+
@Override
50+
protected List<String> remoteClusterAlias() {
51+
return List.of(LINKED_CLUSTER_1);
52+
}
53+
54+
@Override
55+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
56+
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class);
57+
}
58+
59+
@Override
60+
protected Settings nodeSettings() {
61+
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
62+
}
63+
64+
@Override
65+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
66+
// Setting skip_unavailable=false results in a fatal error when the linked cluster is not available.
67+
return Map.of(LINKED_CLUSTER_1, false);
68+
}
69+
70+
public void testCpsShouldNotUseSkipUnavailable() throws Exception {
71+
// Add some dummy data to prove we are communicating fine with the remote.
72+
assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index"));
73+
client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
74+
client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();
75+
76+
// Shut down the linked cluster we'd be targeting in the search.
77+
try {
78+
cluster(LINKED_CLUSTER_1).close();
79+
} catch (Exception e) {
80+
throw new AssertionError(e);
81+
}
82+
83+
/*
84+
* Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster
85+
* and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true
86+
* should not result in a fatal error.
87+
*/
88+
{
89+
var searchRequest = getSearchRequest(true);
90+
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
91+
assertResponse(client().execute(TransportSearchAction.TYPE, searchRequest), result -> {
92+
var originCluster = result.getClusters().getCluster(LOCAL_CLUSTER);
93+
assertThat(originCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SUCCESSFUL));
94+
95+
var linkedCluster = result.getClusters().getCluster(LINKED_CLUSTER_1);
96+
assertThat(linkedCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SKIPPED));
97+
98+
var linkedClusterFailures = result.getClusters().getCluster(LINKED_CLUSTER_1).getFailures();
99+
assertThat(linkedClusterFailures.size(), Matchers.is(1));
100+
// Failure is something along the lines of shard failure and is caused by a connection error.
101+
assertThat(
102+
linkedClusterFailures.getFirst().getCause(),
103+
Matchers.anyOf(
104+
Matchers.instanceOf(RemoteTransportException.class),
105+
Matchers.instanceOf(ConnectTransportException.class)
106+
)
107+
);
108+
});
109+
}
110+
111+
/*
112+
* Previously, we did not get a fatal error even when skip_unavailable=false for the linked cluster.
113+
* Now, we disable partial results and expect a fatal error. This proves that in CPS environment,
114+
* search uses allow_partial_search_results and not skip_unavailable.
115+
*/
116+
{
117+
var searchRequest = getSearchRequest(false);
118+
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
119+
var ae = expectThrows(AssertionError.class, () -> safeGet(client().execute(TransportSearchAction.TYPE, searchRequest)));
120+
assertThat(ae.getCause(), Matchers.instanceOf(ExecutionException.class));
121+
assertThat(
122+
ae.getCause().getCause(),
123+
Matchers.anyOf(Matchers.instanceOf(RemoteTransportException.class), Matchers.instanceOf(ConnectTransportException.class))
124+
);
125+
}
126+
}
127+
128+
private SearchRequest getSearchRequest(boolean allowPartialResults) {
129+
// Include both origin and linked cluster in the search op.
130+
var searchRequest = new SearchRequest("*", "*:*");
131+
searchRequest.allowPartialSearchResults(allowPartialResults);
132+
return searchRequest;
133+
}
134+
}

server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
174174
resolveClusterTask.ensureNotCancelled();
175175
String clusterAlias = remoteIndices.getKey();
176176
OriginalIndices originalIndices = remoteIndices.getValue();
177-
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
177+
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true);
178178
RemoteClusterClient remoteClusterClient = remoteClusterService.getRemoteClusterClient(
179179
clusterAlias,
180180
searchCoordinationExecutor,

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ private void doExecuteForked(
332332
);
333333

334334
boolean ensureConnected = forceConnectTimeoutSecs != null
335-
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
335+
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias).orElse(true) == false;
336336
transportService.getRemoteClusterService()
337337
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
338338
}

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,14 +508,16 @@ public static final class Clusters implements ToXContentFragment, Writeable {
508508
* @param localIndices The localIndices to be searched - null if no local indices are to be searched
509509
* @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster
510510
* @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS
511-
* @param skipUnavailablePredicate given a cluster alias, returns true if that cluster is skip_unavailable=true
512-
* and false otherwise
511+
* @param skipOnFailurePredicate given a cluster alias, returns true if that cluster is marked as skippable
512+
* and false otherwise. For a cluster to be considered as skippable, either
513+
* we should be in CPS environment and allow_partial_results=true, or,
514+
* skip_unavailable=true.
513515
*/
514516
public Clusters(
515517
@Nullable OriginalIndices localIndices,
516518
Map<String, OriginalIndices> remoteClusterIndices,
517519
boolean ccsMinimizeRoundtrips,
518-
Predicate<String> skipUnavailablePredicate
520+
Predicate<String> skipOnFailurePredicate
519521
) {
520522
assert remoteClusterIndices.size() > 0 : "At least one remote cluster must be passed into this Cluster constructor";
521523
this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1);
@@ -531,8 +533,8 @@ public Clusters(
531533
}
532534
for (Map.Entry<String, OriginalIndices> remote : remoteClusterIndices.entrySet()) {
533535
String clusterAlias = remote.getKey();
534-
boolean skipUnavailable = skipUnavailablePredicate.test(clusterAlias);
535-
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipUnavailable);
536+
boolean skipOnFailure = skipOnFailurePredicate.test(clusterAlias);
537+
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipOnFailure);
536538
m.put(clusterAlias, c);
537539
}
538540
this.clusterInfo = m;

0 commit comments

Comments
 (0)