Skip to content

Commit f92aa7a

Browse files
Use REMOTE_CLUSTER_PROFILE for linked project connections when CPS is enabled (#135964)
Currently the presence of credentials for a linked project alias determines if the RCS2.0 REMOTE_CLUSTER_PROFILE is used for the transport profile. For CPS we want to use the REMOTE_CLUSTER_PROFILE, but will not be using the credentials manager. This PR adds a check to see if CPS is enabled and if so the REMOTE_CLUSTER_PROFILE is used when constructing RemoteClusterConnection instances. Resolves: ES-13109
1 parent b4cd892 commit f92aa7a

File tree

8 files changed

+76
-64
lines changed

8 files changed

+76
-64
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,21 @@ public final class RemoteClusterConnection implements Closeable {
6060
* @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
6161
* i.e. it has a credential configured via secure setting.
6262
* This means the remote cluster uses the advances RCS model (as opposed to the basic model).
63+
* @param crossProjectEnabled True if cross-project search is enabled, false otherwise.
6364
*/
6465
RemoteClusterConnection(
6566
LinkedProjectConfig config,
6667
TransportService transportService,
67-
RemoteClusterCredentialsManager credentialsManager
68+
RemoteClusterCredentialsManager credentialsManager,
69+
boolean crossProjectEnabled
6870
) {
6971
this.transportService = transportService;
7072
this.clusterAlias = config.linkedProjectAlias();
7173
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
7274
config,
73-
credentialsManager.hasCredentials(clusterAlias)
75+
crossProjectEnabled || credentialsManager.hasCredentials(clusterAlias)
76+
? RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE
77+
: TransportSettings.DEFAULT_PROFILE
7478
);
7579
this.remoteConnectionManager = new RemoteConnectionManager(
7680
clusterAlias,
@@ -217,7 +221,7 @@ public RemoteConnectionInfo getConnectionInfo() {
217221
connectionStrategy.getModeInfo(),
218222
initialConnectionTimeout,
219223
skipUnavailable,
220-
REMOTE_CLUSTER_PROFILE.equals(remoteConnectionManager.getConnectionProfile().getTransportProfile())
224+
remoteConnectionManager.getCredentialsManager().hasCredentials(clusterAlias)
221225
);
222226
}
223227

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public synchronized void updateRemoteCluster(
371371

372372
if (remote == null) {
373373
// this is a new cluster we have to add a new representation
374-
remote = new RemoteClusterConnection(config, transportService, remoteClusterCredentialsManager);
374+
remote = new RemoteClusterConnection(config, transportService, remoteClusterCredentialsManager, crossProjectEnabled);
375375
connectionMap.put(clusterAlias, remote);
376376
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
377377
} else if (forceRebuild || remote.shouldRebuildConnection(config)) {
@@ -382,7 +382,7 @@ public synchronized void updateRemoteCluster(
382382
logger.warn("project [" + projectId + "] failed to close remote cluster connections for cluster: " + clusterAlias, e);
383383
}
384384
connectionMap.remove(clusterAlias);
385-
remote = new RemoteClusterConnection(config, transportService, remoteClusterCredentialsManager);
385+
remote = new RemoteClusterConnection(config, transportService, remoteClusterCredentialsManager, crossProjectEnabled);
386386
connectionMap.put(clusterAlias, remote);
387387
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
388388
} else {

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,7 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
9595
connectionManager.addListener(this);
9696
}
9797

98-
static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, boolean credentialsProtected) {
99-
final String transportProfile = credentialsProtected
100-
? RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE
101-
: TransportSettings.DEFAULT_PROFILE;
102-
98+
static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) {
10399
ConnectionProfile.Builder builder = new ConnectionProfile.Builder().setConnectTimeout(config.transportConnectTimeout())
104100
.setHandshakeTimeout(config.transportConnectTimeout())
105101
.setCompressionEnabled(config.connectionCompression())

server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
6161
private final Settings settings = Settings.builder().put(modeKey, "proxy").build();
6262
private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
6363
RemoteClusterSettings.toConfig("cluster", settings),
64-
false
64+
TransportSettings.DEFAULT_PROFILE
6565
);
6666
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
6767

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,8 @@ public void run() {
237237
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
238238
String clusterAlias = "test-cluster";
239239
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
240-
try (
241-
RemoteClusterConnection connection = new RemoteClusterConnection(
242-
RemoteClusterSettings.toConfig(clusterAlias, settings),
243-
service,
244-
randomFrom(RemoteClusterCredentialsManager.EMPTY, buildCredentialsManager(clusterAlias))
245-
)
246-
) {
240+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, randomBoolean())) {
241+
247242
ActionListener<Void> listener = ActionListener.wrap(x -> {
248243
listenerCalled.countDown();
249244
fail("expected exception");
@@ -313,13 +308,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt
313308
service.acceptIncomingRequests();
314309
String clusterAlias = "test-cluster";
315310
Settings settings = buildRandomSettings(clusterAlias, seedNodes);
316-
try (
317-
RemoteClusterConnection connection = new RemoteClusterConnection(
318-
RemoteClusterSettings.toConfig(clusterAlias, settings),
319-
service,
320-
RemoteClusterCredentialsManager.EMPTY
321-
)
322-
) {
311+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, false)) {
323312
int numThreads = randomIntBetween(4, 10);
324313
Thread[] threads = new Thread[numThreads];
325314
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
@@ -466,13 +455,7 @@ private void doTestGetConnectionInfo(boolean hasClusterCredentials) throws Excep
466455
);
467456
settings = Settings.builder().put(settings).setSecureSettings(secureSettings).build();
468457
}
469-
try (
470-
RemoteClusterConnection connection = new RemoteClusterConnection(
471-
RemoteClusterSettings.toConfig(clusterAlias, settings),
472-
service,
473-
hasClusterCredentials ? buildCredentialsManager(clusterAlias) : RemoteClusterCredentialsManager.EMPTY
474-
)
475-
) {
458+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, hasClusterCredentials)) {
476459
// test no nodes connected
477460
RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
478461
assertNotNull(remoteConnectionInfo);
@@ -662,13 +645,7 @@ private void doTestCollectNodes(boolean hasClusterCredentials) throws Exception
662645
settings = Settings.builder().put(settings).setSecureSettings(secureSettings).build();
663646
}
664647

665-
try (
666-
RemoteClusterConnection connection = new RemoteClusterConnection(
667-
RemoteClusterSettings.toConfig(clusterAlias, settings),
668-
service,
669-
hasClusterCredentials ? buildCredentialsManager(clusterAlias) : RemoteClusterCredentialsManager.EMPTY
670-
)
671-
) {
648+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, hasClusterCredentials)) {
672649
CountDownLatch responseLatch = new CountDownLatch(1);
673650
AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
674651
AtomicReference<Exception> failReference = new AtomicReference<>();
@@ -718,13 +695,7 @@ public void testNoChannelsExceptREG() throws Exception {
718695
String clusterAlias = "test-cluster";
719696
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
720697

721-
try (
722-
RemoteClusterConnection connection = new RemoteClusterConnection(
723-
RemoteClusterSettings.toConfig(clusterAlias, settings),
724-
service,
725-
RemoteClusterCredentialsManager.EMPTY
726-
)
727-
) {
698+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, false)) {
728699
PlainActionFuture<Void> plainActionFuture = new PlainActionFuture<>();
729700
connection.ensureConnected(plainActionFuture);
730701
plainActionFuture.get(10, TimeUnit.SECONDS);
@@ -790,13 +761,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
790761

791762
String clusterAlias = "test-cluster";
792763
Settings settings = buildRandomSettings(clusterAlias, seedNodes);
793-
try (
794-
RemoteClusterConnection connection = new RemoteClusterConnection(
795-
RemoteClusterSettings.toConfig(clusterAlias, settings),
796-
service,
797-
randomFrom(RemoteClusterCredentialsManager.EMPTY, buildCredentialsManager(clusterAlias))
798-
)
799-
) {
764+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, randomBoolean())) {
800765
final int numGetThreads = randomIntBetween(4, 10);
801766
final Thread[] getThreads = new Thread[numGetThreads];
802767
final int numModifyingThreads = randomIntBetween(4, 10);
@@ -890,13 +855,7 @@ public void testGetConnection() throws Exception {
890855
service.acceptIncomingRequests();
891856
String clusterAlias = "test-cluster";
892857
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
893-
try (
894-
RemoteClusterConnection connection = new RemoteClusterConnection(
895-
RemoteClusterSettings.toConfig(clusterAlias, settings),
896-
service,
897-
RemoteClusterCredentialsManager.EMPTY
898-
)
899-
) {
858+
try (RemoteClusterConnection connection = createConnection(clusterAlias, settings, service, false)) {
900859
safeAwait(listener -> connection.ensureConnected(listener.map(x -> null)));
901860
for (int i = 0; i < 10; i++) {
902861
// always a direct connection as the remote node is already connected
@@ -953,4 +912,18 @@ private static RemoteClusterCredentialsManager buildCredentialsManager(String cl
953912
builder.setSecureSettings(secureSettings);
954913
return new RemoteClusterCredentialsManager(builder.build());
955914
}
915+
916+
private RemoteClusterConnection createConnection(
917+
String alias,
918+
Settings settings,
919+
TransportService transportService,
920+
boolean hasCredentials
921+
) {
922+
return new RemoteClusterConnection(
923+
RemoteClusterSettings.toConfig(alias, settings),
924+
transportService,
925+
hasCredentials ? buildCredentialsManager(alias) : RemoteClusterCredentialsManager.EMPTY,
926+
false
927+
);
928+
}
956929
}

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1782,6 +1782,36 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite
17821782
}
17831783
}
17841784

1785+
public void testCorrectTransportProfileUsedWhenCPSEnabled() throws IOException {
1786+
final var versionInfo = VersionInformation.CURRENT;
1787+
final var transportVers = TransportVersion.current();
1788+
final var knownNodes = new CopyOnWriteArrayList<DiscoveryNode>();
1789+
final var linkedTransportServiceSettings = Settings.builder()
1790+
.put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true")
1791+
.put(RemoteClusterPortSettings.PORT.getKey(), "0")
1792+
.build();
1793+
1794+
try (var seedTransport = startTransport("seed_node", knownNodes, versionInfo, transportVers, linkedTransportServiceSettings)) {
1795+
knownNodes.add(seedTransport.getLocalNode());
1796+
final var settings = Settings.builder()
1797+
.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalNode().getAddress().toString())
1798+
.put("serverless.cross_project.enabled", true)
1799+
.build();
1800+
try (var transportService = MockTransportService.createNewService(settings, versionInfo, transportVers, threadPool)) {
1801+
transportService.start();
1802+
transportService.acceptIncomingRequests();
1803+
try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) {
1804+
initializeRemoteClusters(service);
1805+
assertTrue(hasRegisteredClusters(service));
1806+
assertConnectionHasProfile(
1807+
service.getRemoteClusterConnection("cluster1"),
1808+
RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE
1809+
);
1810+
}
1811+
}
1812+
}
1813+
}
1814+
17851815
private static void assertConnectionHasProfile(RemoteClusterConnection remoteClusterConnection, String expectedConnectionProfile) {
17861816
assertThat(
17871817
remoteClusterConnection.getConnectionManager().getConnectionProfile().getTransportProfile(),

server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,10 @@ public void testCorrectChannelNumber() {
143143
for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) {
144144
String settingKey = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
145145
Settings proxySettings = Settings.builder().put(settingKey, strategy.name()).build();
146-
ConnectionProfile proxyProfile = buildConnectionProfile(toConfig(clusterAlias, proxySettings), randomBoolean());
146+
ConnectionProfile proxyProfile = buildConnectionProfile(
147+
toConfig(clusterAlias, proxySettings),
148+
randomBoolean() ? RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE : TransportSettings.DEFAULT_PROFILE
149+
);
147150
assertEquals(
148151
"Incorrect number of channels for " + strategy.name(),
149152
strategy.getNumberOfChannels(),
@@ -157,7 +160,10 @@ public void testTransportProfile() {
157160

158161
// New rcs connection with credentials
159162
for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) {
160-
ConnectionProfile profile = buildConnectionProfile(toConfig(clusterAlias, Settings.EMPTY), true);
163+
ConnectionProfile profile = buildConnectionProfile(
164+
toConfig(clusterAlias, Settings.EMPTY),
165+
RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE
166+
);
161167
assertEquals(
162168
"Incorrect transport profile for " + strategy.name(),
163169
RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE,
@@ -167,7 +173,7 @@ public void testTransportProfile() {
167173

168174
// Legacy ones without credentials
169175
for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) {
170-
ConnectionProfile profile = buildConnectionProfile(toConfig(clusterAlias, Settings.EMPTY), false);
176+
ConnectionProfile profile = buildConnectionProfile(toConfig(clusterAlias, Settings.EMPTY), TransportSettings.DEFAULT_PROFILE);
171177
assertEquals(
172178
"Incorrect transport profile for " + strategy.name(),
173179
TransportSettings.DEFAULT_PROFILE,

server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ public void setUp() throws Exception {
9393
builder.setSecureSettings(secureSettings);
9494
}
9595
clientSettings = builder.build();
96-
profile = RemoteConnectionStrategy.buildConnectionProfile(toConfig(clusterAlias, clientSettings), hasClusterCredentials);
96+
profile = RemoteConnectionStrategy.buildConnectionProfile(
97+
toConfig(clusterAlias, clientSettings),
98+
hasClusterCredentials ? RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE : TransportSettings.DEFAULT_PROFILE
99+
);
97100
}
98101

99102
@Override

0 commit comments

Comments
 (0)