diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 3dfbe960a8e89..c909c9a25e5b8 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1121,7 +1121,8 @@ public Map queryFields() { settingsModule.getClusterSettings(), taskManager, telemetryProvider.getTracer(), - nodeEnvironment.nodeId() + nodeEnvironment.nodeId(), + projectResolver ); final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry()); final SearchTransportService searchTransportService = new SearchTransportService( diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 23382b714ccfe..5f1dc11ea16e1 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -42,6 +42,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; @@ -119,9 +120,20 @@ TransportService newTransportService( ClusterSettings clusterSettings, TaskManager taskManager, Tracer tracer, - String nodeId + String nodeId, + ProjectResolver projectResolver ) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager); + return new TransportService( + settings, + transport, + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), + taskManager, + projectResolver + ); } HttpServerTransport newHttpTransport(PluginsService pluginsService, NetworkModule networkModule) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 8249264a9bfbb..ac3c8a74af37c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.cluster.project.DefaultProjectResolver; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -85,15 +84,14 @@ public boolean isRemoteClusterServerEnabled() { private final ProjectResolver projectResolver; private final boolean canUseSkipUnavailable; - @FixForMultiProject(description = "Inject the ProjectResolver instance.") - RemoteClusterService(Settings settings, TransportService transportService) { + RemoteClusterService(Settings settings, TransportService transportService, ProjectResolver projectResolver) { super(settings); this.isRemoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.isSearchNode = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE); this.isStateless = DiscoveryNode.isStateless(settings); this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings); this.transportService = transportService; - this.projectResolver = DefaultProjectResolver.INSTANCE; + this.projectResolver = projectResolver; this.remoteClusters = projectResolver.supportsMultipleProjects() ? ConcurrentCollections.newConcurrentMap() : Map.of(ProjectId.DEFAULT, ConcurrentCollections.newConcurrentMap()); @@ -694,9 +692,9 @@ private Map getConnectionsMapForCurrentProject( /** * Returns the map of connections for the given {@link ProjectId}. */ + @FixForMultiProject(description = "Assert ProjectId.DEFAULT should not be used in multi-project environment") private Map getConnectionsMapForProject(ProjectId projectId) { if (projectResolver.supportsMultipleProjects()) { - assert ProjectId.DEFAULT.equals(projectId) == false : "The default project ID should not be used in multi-project environment"; return remoteClusters.computeIfAbsent(projectId, unused -> ConcurrentCollections.newConcurrentMap()); } assert ProjectId.DEFAULT.equals(projectId) : "Only the default project ID should be used when multiple projects are not supported"; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index e44291eacbc06..086bef3093dc6 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -18,6 +18,8 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.DefaultProjectResolver; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -232,7 +234,7 @@ public TransportService( ); } - // NOTE: Only for use in tests + // Public access for tests. public TransportService( Settings settings, Transport transport, @@ -249,12 +251,11 @@ public TransportService( transportInterceptor, localNodeFactory, clusterSettings, - new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), new TaskManager(settings, threadPool, taskHeaders) ); } - @SuppressWarnings("this-escape") + // Public access for tests. public TransportService( Settings settings, Transport transport, @@ -264,6 +265,31 @@ public TransportService( @Nullable ClusterSettings clusterSettings, ConnectionManager connectionManager, TaskManager taskManger + ) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + connectionManager, + taskManger, + DefaultProjectResolver.INSTANCE + ); + } + + @SuppressWarnings("this-escape") + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + ConnectionManager connectionManager, + TaskManager taskManger, + ProjectResolver projectResolver ) { this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); @@ -278,7 +304,7 @@ public TransportService( this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings); - remoteClusterService = new RemoteClusterService(settings, this); + remoteClusterService = new RemoteClusterService(settings, this, projectResolver); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 93091b8a5139b..c4fd4b20ac48c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.cluster.project.DefaultProjectResolver; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; @@ -72,6 +73,10 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } + private RemoteClusterService createRemoteClusterService(final Settings settings, final MockTransportService transportService) { + return new RemoteClusterService(settings, transportService, DefaultProjectResolver.INSTANCE); + } + private MockTransportService startTransport( String id, List knownNodes, @@ -166,7 +171,7 @@ public void testGroupClusterIndices() throws IOException { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertTrue(hasRegisteredClusters(service)); @@ -380,7 +385,7 @@ public void testGroupIndices() throws IOException { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertTrue(hasRegisteredClusters(service)); @@ -443,7 +448,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "node-1").build(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE) ); - try (RemoteClusterService service = new RemoteClusterService(settings, null)) { + try (RemoteClusterService service = createRemoteClusterService(settings, null)) { expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertFalse(hasRegisteredClusters(service)); final IllegalArgumentException error = expectThrows( @@ -487,7 +492,7 @@ public void testIncrementallyAddClusters() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertFalse(hasRegisteredClusters(service)); @@ -563,7 +568,7 @@ public void testDefaultPingSchedule() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertTrue(hasRegisteredClusters(service)); @@ -627,7 +632,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); - try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { service.initializeRemoteClusters(); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); @@ -666,7 +671,7 @@ public void testChangeSettings() throws Exception { transportService.acceptIncomingRequests(); Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { service.initializeRemoteClusters(); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); @@ -751,7 +756,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertFalse(hasRegisteredClusters(service)); @@ -841,7 +846,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertFalse(hasRegisteredClusters(service)); @@ -936,7 +941,7 @@ public void testCollectNodes() throws InterruptedException, IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertFalse(hasRegisteredClusters(service)); @@ -1090,7 +1095,7 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(createSettings("cluster_1", seedList), transportService)) { + try (RemoteClusterService service = createRemoteClusterService(createSettings("cluster_1", seedList), transportService)) { service.initializeRemoteClusters(); assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); @@ -1277,7 +1282,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { final Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); - try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); assertTrue(hasRegisteredClusters(service)); @@ -1420,7 +1425,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .build(); - try (RemoteClusterService service = new RemoteClusterService(settingsWithRemoteClusterClientRole, null)) { + try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole, null)) { service.ensureClientIsEnabled(); } @@ -1429,7 +1434,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .build(); - try (RemoteClusterService service = new RemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) { + try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) { final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat(exception.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role")); } @@ -1440,7 +1445,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.INDEX_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = new RemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) { final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat( exception.getMessage(), @@ -1457,7 +1462,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) { service.ensureClientIsEnabled(); } final var statelessEnabledOnRemoteClusterClientSettings = Settings.builder() @@ -1465,7 +1470,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnRemoteClusterClientSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings, null)) { service.ensureClientIsEnabled(); } final var statelessEnabledOnSearchNodeAndRemoteClusterClientSettings = Settings.builder() @@ -1473,7 +1478,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = new RemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings, null)) { service.ensureClientIsEnabled(); } } @@ -1544,7 +1549,7 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster }); transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { service.initializeRemoteClusters(); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -1618,7 +1623,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { service.initializeRemoteClusters(); final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); @@ -1707,7 +1712,7 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite alias -> alias.equals(goodCluster) || alias.equals(badCluster), () -> randomAlphaOfLength(10) ); - try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { service.initializeRemoteClusters(); final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); @@ -1793,7 +1798,7 @@ public void testLogsConnectionResult() throws IOException { try ( var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var remoteClusterService = new RemoteClusterService(Settings.EMPTY, local) + var remoteClusterService = createRemoteClusterService(Settings.EMPTY, local) ) { var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); remoteClusterService.listenForUpdates(clusterSettings); diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index ce6ac76d43e3a..a6ab3699eb206 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -173,7 +173,8 @@ protected TransportService newTransportService( ClusterSettings clusterSettings, TaskManager taskManager, Tracer tracer, - String nodeId + String nodeId, + ProjectResolver projectResolver ) { // we use the MockTransportService.TestPlugin class as a marker to create a network @@ -191,7 +192,8 @@ protected TransportService newTransportService( clusterSettings, taskManager, tracer, - nodeId + nodeId, + projectResolver ); } else { return new MockTransportService( diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 61a43a4b4e5d1..9f52077199ca5 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -73,6 +73,9 @@ tasks.named("yamlRestTest").configure { '^reindex/60_wait_for_active_shards/can override wait_for_active_shards', // <- Requires a single shard '^reindex/90_remote/*', '^reindex/95_parent_join/Reindex from remote*', + + // Linked project configuration via ClusterSettings currently does not support multi-project + '^cluster.stats/30_ccs_stats/cross-cluster search stats search', ]; if (buildParams.snapshotBuild == false) { blacklist += [];