Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
resolveClusterTask.ensureNotCancelled();
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true) == false;
RemoteClusterClient remoteClusterClient = remoteClusterService.getRemoteClusterClient(
clusterAlias,
searchCoordinationExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private void doExecuteForked(
);

boolean ensureConnected = forceConnectTimeoutSecs != null
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias).orElse(true) == false;
transportService.getRemoteClusterService()
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -160,7 +161,7 @@ public boolean isRemoteClusterServerEnabled() {
private final Map<ProjectId, Map<String, RemoteClusterConnection>> remoteClusters;
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
private final ProjectResolver projectResolver;
private final boolean inSkippableContext;
private final boolean isCpsEnabled;

@FixForMultiProject(description = "Inject the ProjectResolver instance.")
RemoteClusterService(Settings settings, TransportService transportService) {
Expand All @@ -182,7 +183,7 @@ public boolean isRemoteClusterServerEnabled() {
* TODO: This is not the right way to check if we're in CPS context and is more of a temporary measure since
* the functionality to do it the right way is not yet ready -- replace this code when it's ready.
*/
this.inSkippableContext = settings.getAsBoolean("serverless.cross_project.enabled", false);
this.isCpsEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false);
}

/**
Expand Down Expand Up @@ -293,22 +294,28 @@ void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
}

/**
* Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable
* Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable.
* @param clusterAlias Name of the cluster
* @return A boolean optional that denotes if the cluster is configured to be skipped. In CPS-like environment,
* it returns an empty value where we default/fall back to true.
*/
public boolean isSkipUnavailable(String clusterAlias) {
return getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
public Optional<Boolean> isSkipUnavailable(String clusterAlias) {
if (isCpsEnabled) {
return Optional.empty();
} else {
return Optional.of(getRemoteClusterConnection(clusterAlias).isSkipUnavailable());
}
}

/**
* Returns whether we're in a skippable context. Skippable context is true when either in CPS environment
* or skip_unavailable is set to true for the specified cluster.
* Signifies if an error can be skipped for the specified cluster based on skip_unavailable, or,
* allow_partial_search_results if in CPS-like environment.
* @param clusterAlias Name of the cluster
* @param allowPartialSearchResults If partial results can be served for the search request.
* @return boolean
*/
public boolean shouldSkipOnFailure(String clusterAlias, Boolean allowPartialSearchResults) {
return (inSkippableContext && (allowPartialSearchResults != null && allowPartialSearchResults))
|| getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
return isSkipUnavailable(clusterAlias).orElseGet(() -> allowPartialSearchResults != null && allowPartialSearchResults);
}

public Transport.Connection getConnection(String cluster) {
Expand Down Expand Up @@ -675,7 +682,9 @@ public RemoteClusterClient getRemoteClusterClient(
return new RemoteClusterAwareClient(transportService, clusterAlias, responseExecutor, switch (disconnectedStrategy) {
case RECONNECT_IF_DISCONNECTED -> true;
case FAIL_IF_DISCONNECTED -> false;
case RECONNECT_UNLESS_SKIP_UNAVAILABLE -> transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
case RECONNECT_UNLESS_SKIP_UNAVAILABLE -> transportService.getRemoteClusterService()
.isSkipUnavailable(clusterAlias)
.orElse(true) == false;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1370,15 +1370,15 @@ public void testSkipUnavailable() {
service.start();
service.acceptIncomingRequests();

assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1"));
assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true));

if (randomBoolean()) {
updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false);
assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1"));
assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true));
}

updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true);
assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1"));
assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true));
}
}
}
Expand Down