Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -35,15 +34,7 @@ public TransportRemoteInfoAction(TransportService transportService, ActionFilter

@Override
protected void doExecute(Task task, RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
if (remoteClusterService.isEnabled() == false) {
throw new IllegalArgumentException(
"node ["
+ remoteClusterService.getLocalNode().getName()
+ "] does not have the ["
+ DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName()
+ "] role"
);
}
remoteClusterService.ensureClientIsEnabled();
listener.onResponse(new RemoteInfoResponse(remoteClusterService.getRemoteConnectionInfos().collect(toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ protected RemoteClusterAware(Settings settings) {
this.isRemoteClusterClientEnabled = DiscoveryNode.isRemoteClusterClient(settings);
}

protected String getNodeName() {
return nodeName;
}

/**
* Returns remote clusters that are enabled in these settings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,11 @@ public final class RemoteClusterService extends RemoteClusterAware

public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake";

private final boolean enabled;
private final boolean isRemoteClusterClient;
private final boolean isSearchNode;
private final boolean isStateless;
private final boolean remoteClusterServerEnabled;

public boolean isEnabled() {
return enabled;
}

public boolean isRemoteClusterServerEnabled() {
return remoteClusterServerEnabled;
}
Expand All @@ -166,7 +164,9 @@ public boolean isRemoteClusterServerEnabled() {
@FixForMultiProject(description = "Inject the ProjectResolver instance.")
RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.isRemoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
this.isSearchNode = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
this.isStateless = DiscoveryNode.isStateless(settings);
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
this.transportService = transportService;
this.projectResolver = DefaultProjectResolver.INSTANCE;
Expand All @@ -179,10 +179,6 @@ public boolean isRemoteClusterServerEnabled() {
}
}

public DiscoveryNode getLocalNode() {
return transportService.getLocalNode();
}

/**
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
* @param remoteClusterNames Set of configured remote cluster names.
Expand Down Expand Up @@ -335,11 +331,7 @@ public void maybeEnsureConnectedAndGetConnection(
}

public RemoteClusterConnection getRemoteClusterConnection(String cluster) {
if (enabled == false) {
throw new IllegalArgumentException(
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
ensureClientIsEnabled();
@FixForMultiProject(description = "Verify all callers will have the proper context set for resolving the origin project ID.")
RemoteClusterConnection connection = getConnectionsMapForCurrentProject().get(cluster);
if (connection == null) {
Expand Down Expand Up @@ -595,11 +587,7 @@ public RemoteClusterServerInfo info() {
* function on success.
*/
public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String, String, DiscoveryNode>> listener) {
if (enabled == false) {
throw new IllegalArgumentException(
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
ensureClientIsEnabled();
@FixForMultiProject(description = "Analyze usages and determine if the project ID must be provided.")
final var projectConnectionsMap = getConnectionsMapForCurrentProject();
final var connectionsMap = new HashMap<String, RemoteClusterConnection>();
Expand Down Expand Up @@ -662,11 +650,7 @@ public RemoteClusterClient getRemoteClusterClient(
Executor responseExecutor,
DisconnectedStrategy disconnectedStrategy
) {
if (transportService.getRemoteClusterService().isEnabled() == false) {
throw new IllegalArgumentException(
"this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"
);
}
ensureClientIsEnabled();
if (transportService.getRemoteClusterService().getRegisteredRemoteClusterNames().contains(clusterAlias) == false) {
throw new NoSuchRemoteClusterException(clusterAlias);
}
Expand All @@ -677,6 +661,27 @@ public RemoteClusterClient getRemoteClusterClient(
});
}

/**
* Verifies this node is configured to support linked project client operations.
* @throws IllegalArgumentException If this node is not configured to support client operations.
*/
public void ensureClientIsEnabled() {
if (isRemoteClusterClient == false) {
throw new IllegalArgumentException(
"node [" + getNodeName() + "] does not have the [" + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + "] role"
);
}
if (isStateless && isSearchNode == false) {
throw new IllegalArgumentException(
"node ["
+ getNodeName()
+ "] must have the ["
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "] role in stateless environments to use linked project client features"
);
}
}

static void registerRemoteClusterHandshakeRequestHandler(TransportService transportService) {
transportService.registerRequestHandler(
REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -215,7 +216,10 @@ public void testEnsureWeReconnect() throws Exception {
}

public void testRemoteClusterServiceNotEnabled() {
final Settings settings = removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final Settings settings = Settings.builder()
.put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
.put(Node.NODE_NAME_SETTING.getKey(), "node-1")
.build();
try (
MockTransportService service = MockTransportService.createNewService(
settings,
Expand All @@ -236,7 +240,7 @@ public void testRemoteClusterServiceNotEnabled() {
randomFrom(RemoteClusterService.DisconnectedStrategy.values())
)
);
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
assertThat(e.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception {
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE)
);
try (RemoteClusterService service = new RemoteClusterService(settings, null)) {
assertFalse(service.isEnabled());
expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled);
assertFalse(hasRegisteredClusters(service));
final IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
Expand Down Expand Up @@ -1383,7 +1383,10 @@ public void testSkipUnavailable() {
}

public void testRemoteClusterServiceNotEnabledGetRemoteClusterConnection() {
final Settings settings = removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final Settings settings = Settings.builder()
.put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
.put(Node.NODE_NAME_SETTING.getKey(), "node-1")
.build();
try (
MockTransportService service = MockTransportService.createNewService(
settings,
Expand All @@ -1399,12 +1402,57 @@ public void testRemoteClusterServiceNotEnabledGetRemoteClusterConnection() {
IllegalArgumentException.class,
() -> service.getRemoteClusterService().getRemoteClusterConnection("test")
);
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
assertThat(e.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role"));
}
}

public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException {
final var nodeNameSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "node-1").build();

// Shouldn't throw when the remote cluster client role is enabled.
final var settingsWithRemoteClusterClientRole = Settings.builder()
.put(nodeNameSettings)
.put(onlyRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
.build();
try (RemoteClusterService service = new RemoteClusterService(settingsWithRemoteClusterClientRole, null)) {
service.ensureClientIsEnabled();
}

// Expect throws when missing the remote cluster client role.
final var settingsWithoutRemoteClusterClientRole = Settings.builder()
.put(nodeNameSettings)
.put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
.build();
try (RemoteClusterService service = new RemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) {
expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled);
}

// Expect throws when missing search node role when stateless is enabled.
final var statelessEnabledSettingsOnNonSearchNode = Settings.builder()
.put(nodeNameSettings)
.put(onlyRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
.put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true)
.build();
try (RemoteClusterService service = new RemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) {
expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled);
}

// Shouldn't throw when stateless is enabled on a search node.
final var statelessEnabledOnSearchNodeSettings = Settings.builder()
.put(nodeNameSettings)
.put(onlyRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE, DiscoveryNodeRole.SEARCH_ROLE)))
.put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true)
.build();
try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) {
service.ensureClientIsEnabled();
}
}

public void testRemoteClusterServiceNotEnabledGetCollectNodes() {
final Settings settings = removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final Settings settings = Settings.builder()
.put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)))
.put(Node.NODE_NAME_SETTING.getKey(), "node-1")
.build();
try (
MockTransportService service = MockTransportService.createNewService(
settings,
Expand All @@ -1420,7 +1468,7 @@ public void testRemoteClusterServiceNotEnabledGetCollectNodes() {
IllegalArgumentException.class,
() -> service.getRemoteClusterService().collectNodes(Set.of(), ActionListener.noop())
);
assertThat(e.getMessage(), equalTo("this node does not have the remote_cluster_client role"));
assertThat(e.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role"));
}
}

Expand Down