Skip to content

Commit 5abf805

Browse files
CPS search should not use skip_unavailable
1 parent 7a34391 commit 5abf805

File tree

3 files changed

+53
-34
lines changed

3 files changed

+53
-34
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,14 +508,16 @@ public static final class Clusters implements ToXContentFragment, Writeable {
508508
* @param localIndices The localIndices to be searched - null if no local indices are to be searched
509509
* @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster
510510
* @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS
511-
* @param skipUnavailablePredicate given a cluster alias, returns true if that cluster is skip_unavailable=true
512-
* and false otherwise
511+
* @param skipOnFailurePredicate given a cluster alias, returns true if that cluster is marked as skippable
512+
* and false otherwise. For a cluster to be considered as skippable, either
513+
* we should be in CPS environment and allow_partial_results=true, or,
514+
* skip_unavailable=true.
513515
*/
514516
public Clusters(
515517
@Nullable OriginalIndices localIndices,
516518
Map<String, OriginalIndices> remoteClusterIndices,
517519
boolean ccsMinimizeRoundtrips,
518-
Predicate<String> skipUnavailablePredicate
520+
Predicate<String> skipOnFailurePredicate
519521
) {
520522
assert remoteClusterIndices.size() > 0 : "At least one remote cluster must be passed into this Cluster constructor";
521523
this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1);
@@ -531,8 +533,8 @@ public Clusters(
531533
}
532534
for (Map.Entry<String, OriginalIndices> remote : remoteClusterIndices.entrySet()) {
533535
String clusterAlias = remote.getKey();
534-
boolean skipUnavailable = skipUnavailablePredicate.test(clusterAlias);
535-
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipUnavailable);
536+
boolean skipOnFailure = skipOnFailurePredicate.test(clusterAlias);
537+
Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipOnFailure);
536538
m.put(clusterAlias, c);
537539
}
538540
this.clusterInfo = m;

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ void executeRequest(
422422
resolvedIndices.getLocalIndices(),
423423
resolvedIndices.getRemoteClusterIndices(),
424424
true,
425-
remoteClusterService::isSkipUnavailable
425+
(clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults())
426426
);
427427
if (resolvedIndices.getLocalIndices() == null) {
428428
// Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards)
@@ -458,7 +458,7 @@ void executeRequest(
458458
resolvedIndices.getLocalIndices(),
459459
resolvedIndices.getRemoteClusterIndices(),
460460
false,
461-
remoteClusterService::isSkipUnavailable
461+
(clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults())
462462
);
463463

464464
// TODO: pass parentTaskId
@@ -697,7 +697,7 @@ static void ccsRemoteReduce(
697697
// and we directly perform final reduction in the remote cluster
698698
Map.Entry<String, OriginalIndices> entry = resolvedIndices.getRemoteClusterIndices().entrySet().iterator().next();
699699
String clusterAlias = entry.getKey();
700-
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
700+
boolean shouldSkipOnFailure = remoteClusterService.shouldSkipOnFailure(clusterAlias, searchRequest.allowPartialSearchResults());
701701
OriginalIndices indices = entry.getValue();
702702
SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest(
703703
parentTaskId,
@@ -713,7 +713,7 @@ static void ccsRemoteReduce(
713713
@Override
714714
public void onResponse(SearchResponse searchResponse) {
715715
// overwrite the existing cluster entry with the updated one
716-
ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, skipUnavailable);
716+
ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, shouldSkipOnFailure);
717717
Map<String, SearchProfileShardResult> profileResults = searchResponse.getProfileResults();
718718
SearchProfileResults profile = profileResults == null || profileResults.isEmpty()
719719
? null
@@ -744,9 +744,9 @@ public void onResponse(SearchResponse searchResponse) {
744744
@Override
745745
public void onFailure(Exception e) {
746746
ShardSearchFailure failure = new ShardSearchFailure(e);
747-
logCCSError(failure, clusterAlias, skipUnavailable);
748-
ccsClusterInfoUpdate(failure, clusters, clusterAlias, skipUnavailable);
749-
if (skipUnavailable) {
747+
logCCSError(failure, clusterAlias, shouldSkipOnFailure);
748+
ccsClusterInfoUpdate(failure, clusters, clusterAlias, shouldSkipOnFailure);
749+
if (shouldSkipOnFailure) {
750750
ActionListener.respondAndRelease(listener, SearchResponse.empty(timeProvider::buildTookInMillis, clusters));
751751
} else {
752752
listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e));
@@ -768,7 +768,7 @@ public void onFailure(Exception e) {
768768

769769
remoteClusterService.maybeEnsureConnectedAndGetConnection(
770770
clusterAlias,
771-
shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable),
771+
shouldEstablishConnection(forceConnectTimeoutSecs, shouldSkipOnFailure),
772772
connectionListener
773773
);
774774
} else {
@@ -785,7 +785,10 @@ public void onFailure(Exception e) {
785785
final CountDown countDown = new CountDown(totalClusters);
786786
for (Map.Entry<String, OriginalIndices> entry : resolvedIndices.getRemoteClusterIndices().entrySet()) {
787787
String clusterAlias = entry.getKey();
788-
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
788+
boolean shouldSkipOnFailure = remoteClusterService.shouldSkipOnFailure(
789+
clusterAlias,
790+
searchRequest.allowPartialSearchResults()
791+
);
789792
OriginalIndices indices = entry.getValue();
790793
SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest(
791794
parentTaskId,
@@ -797,7 +800,7 @@ public void onFailure(Exception e) {
797800
);
798801
ActionListener<SearchResponse> ccsListener = createCCSListener(
799802
clusterAlias,
800-
skipUnavailable,
803+
shouldSkipOnFailure,
801804
countDown,
802805
exceptions,
803806
searchResponseMerger,
@@ -826,7 +829,7 @@ public void onFailure(Exception e) {
826829

827830
remoteClusterService.maybeEnsureConnectedAndGetConnection(
828831
clusterAlias,
829-
shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable),
832+
shouldEstablishConnection(forceConnectTimeoutSecs, shouldSkipOnFailure),
830833
connectionListener
831834
);
832835
}
@@ -903,10 +906,10 @@ static void collectSearchShards(
903906
final AtomicReference<Exception> exceptions = new AtomicReference<>();
904907
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
905908
final String clusterAlias = entry.getKey();
906-
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
909+
boolean shouldSkipOnFailure = remoteClusterService.shouldSkipOnFailure(clusterAlias, allowPartialResults);
907910
CCSActionListener<SearchShardsResponse, Map<String, SearchShardsResponse>> singleListener = new CCSActionListener<>(
908911
clusterAlias,
909-
skipUnavailable,
912+
shouldSkipOnFailure,
910913
responsesCountDown,
911914
exceptions,
912915
clusters,
@@ -975,7 +978,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
975978

976979
remoteClusterService.maybeEnsureConnectedAndGetConnection(
977980
clusterAlias,
978-
shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable),
981+
shouldEstablishConnection(forceConnectTimeoutSecs, shouldSkipOnFailure),
979982
connectionListener
980983
);
981984
}
@@ -986,7 +989,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
986989
*/
987990
private static ActionListener<SearchResponse> createCCSListener(
988991
String clusterAlias,
989-
boolean skipUnavailable,
992+
boolean shouldSkipOnFailure,
990993
CountDown countDown,
991994
AtomicReference<Exception> exceptions,
992995
SearchResponseMerger searchResponseMerger,
@@ -996,15 +999,15 @@ private static ActionListener<SearchResponse> createCCSListener(
996999
) {
9971000
return new CCSActionListener<>(
9981001
clusterAlias,
999-
skipUnavailable,
1002+
shouldSkipOnFailure,
10001003
countDown,
10011004
exceptions,
10021005
clusters,
10031006
ActionListener.releaseAfter(originalListener, searchResponseMerger)
10041007
) {
10051008
@Override
10061009
void innerOnResponse(SearchResponse searchResponse) {
1007-
ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, skipUnavailable);
1010+
ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, shouldSkipOnFailure);
10081011
searchResponseMerger.add(searchResponse);
10091012
progressListener.notifyClusterResponseMinimizeRoundtrips(clusterAlias, searchResponse);
10101013
}
@@ -1029,11 +1032,11 @@ static void ccsClusterInfoUpdate(
10291032
ShardSearchFailure failure,
10301033
SearchResponse.Clusters clusters,
10311034
String clusterAlias,
1032-
boolean skipUnavailable
1035+
boolean shouldSkipOnFailure
10331036
) {
10341037
clusters.swapCluster(clusterAlias, (k, v) -> {
10351038
SearchResponse.Cluster.Status status;
1036-
if (skipUnavailable) {
1039+
if (shouldSkipOnFailure) {
10371040
status = SearchResponse.Cluster.Status.SKIPPED;
10381041
} else {
10391042
status = SearchResponse.Cluster.Status.FAILED;
@@ -1056,7 +1059,7 @@ private static void ccsClusterInfoUpdate(
10561059
SearchResponse searchResponse,
10571060
SearchResponse.Clusters clusters,
10581061
String clusterAlias,
1059-
boolean skipUnavailable
1062+
boolean shouldSkipOnFailure
10601063
) {
10611064
/*
10621065
* Cluster Status logic:
@@ -1070,7 +1073,7 @@ private static void ccsClusterInfoUpdate(
10701073
SearchResponse.Cluster.Status status;
10711074
int totalShards = searchResponse.getTotalShards();
10721075
if (totalShards > 0 && searchResponse.getFailedShards() >= totalShards) {
1073-
if (skipUnavailable) {
1076+
if (shouldSkipOnFailure) {
10741077
status = SearchResponse.Cluster.Status.SKIPPED;
10751078
} else {
10761079
status = SearchResponse.Cluster.Status.FAILED;
@@ -1762,7 +1765,7 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int
17621765
*/
17631766
abstract static class CCSActionListener<Response, FinalResponse> implements ActionListener<Response> {
17641767
protected final String clusterAlias;
1765-
protected final boolean skipUnavailable;
1768+
protected final boolean skipOnFailure;
17661769
private final CountDown countDown;
17671770
private final AtomicReference<Exception> exceptions;
17681771
protected final SearchResponse.Clusters clusters;
@@ -1773,14 +1776,14 @@ abstract static class CCSActionListener<Response, FinalResponse> implements Acti
17731776
*/
17741777
CCSActionListener(
17751778
String clusterAlias,
1776-
boolean skipUnavailable,
1779+
boolean skipOnFailure,
17771780
CountDown countDown,
17781781
AtomicReference<Exception> exceptions,
17791782
SearchResponse.Clusters clusters,
17801783
ActionListener<FinalResponse> originalListener
17811784
) {
17821785
this.clusterAlias = clusterAlias;
1783-
this.skipUnavailable = skipUnavailable;
1786+
this.skipOnFailure = skipOnFailure;
17841787
this.countDown = countDown;
17851788
this.exceptions = exceptions;
17861789
this.clusters = clusters;
@@ -1801,9 +1804,9 @@ public final void onResponse(Response response) {
18011804
@Override
18021805
public final void onFailure(Exception e) {
18031806
ShardSearchFailure f = new ShardSearchFailure(e);
1804-
logCCSError(f, clusterAlias, skipUnavailable);
1807+
logCCSError(f, clusterAlias, skipOnFailure);
18051808
SearchResponse.Cluster cluster = clusters.getCluster(clusterAlias);
1806-
if (skipUnavailable && ExceptionsHelper.isTaskCancelledException(e) == false) {
1809+
if (skipOnFailure && ExceptionsHelper.isTaskCancelledException(e) == false) {
18071810
if (cluster != null) {
18081811
ccsClusterInfoUpdate(f, clusters, clusterAlias, true);
18091812
}
@@ -1859,9 +1862,9 @@ protected void releaseResponse(FinalResponse response) {}
18591862
* causes of shard failures.
18601863
* @param f ShardSearchFailure to log
18611864
* @param clusterAlias cluster on which the failure occurred
1862-
* @param skipUnavailable the skip_unavailable setting of the cluster with the search error
1865+
* @param shouldSkipOnFailure the skip_unavailable setting of the cluster with the search error
18631866
*/
1864-
private static void logCCSError(ShardSearchFailure f, String clusterAlias, boolean skipUnavailable) {
1867+
private static void logCCSError(ShardSearchFailure f, String clusterAlias, boolean shouldSkipOnFailure) {
18651868
String errorInfo;
18661869
try {
18671870
errorInfo = Strings.toString(f.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
@@ -1872,7 +1875,7 @@ private static void logCCSError(ShardSearchFailure f, String clusterAlias, boole
18721875
logger.debug(
18731876
"CCS remote cluster failure. Cluster [{}]. skip_unavailable: [{}]. Error: {}",
18741877
clusterAlias,
1875-
skipUnavailable,
1878+
shouldSkipOnFailure,
18761879
errorInfo
18771880
);
18781881
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ public boolean isRemoteClusterServerEnabled() {
160160
private final Map<ProjectId, Map<String, RemoteClusterConnection>> remoteClusters;
161161
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
162162
private final ProjectResolver projectResolver;
163+
private final boolean inSkippableContext;
163164

164165
@FixForMultiProject(description = "Inject the ProjectResolver instance.")
165166
RemoteClusterService(Settings settings, TransportService transportService) {
@@ -177,6 +178,7 @@ public boolean isRemoteClusterServerEnabled() {
177178
if (remoteClusterServerEnabled) {
178179
registerRemoteClusterHandshakeRequestHandler(transportService);
179180
}
181+
this.inSkippableContext = settings.getAsBoolean("serverless.cross_project.enabled", false);
180182
}
181183

182184
/**
@@ -293,6 +295,18 @@ public boolean isSkipUnavailable(String clusterAlias) {
293295
return getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
294296
}
295297

298+
/**
299+
* Returns whether we're in a skippable context. Skippable context is true when either in CPS environment
300+
* or skip_unavailable is set to true for the specified cluster.
301+
* @param clusterAlias Name of the cluster
302+
* @param allowPartialSearchResults If partial results can be served for the search request.
303+
* @return boolean
304+
*/
305+
public boolean shouldSkipOnFailure(String clusterAlias, Boolean allowPartialSearchResults) {
306+
return (inSkippableContext && (allowPartialSearchResults != null && allowPartialSearchResults))
307+
|| getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
308+
}
309+
296310
public Transport.Connection getConnection(String cluster) {
297311
return getRemoteClusterConnection(cluster).getConnection();
298312
}

0 commit comments

Comments
 (0)