diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java index 0a7451702ec66..fa1ca4e211526 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java @@ -16,12 +16,14 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -136,7 +138,8 @@ protected HealthNode createTask( public PersistentTasksCustomMetadata.Assignment getAssignment( HealthNodeTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData); if (discoveryNode == null) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index ceeb1a4e27f1b..f3a25caf79bb6 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -171,9 +171,9 @@ public ClusterState execute(ClusterState currentState) { assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER) || (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT) : "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]"; - taskExecutor.validate(taskParams, currentState); + taskExecutor.validate(taskParams, currentState, projectId); - Assignment assignment = createAssignment(taskName, taskParams, currentState); + Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId); logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment); return builder.addTask(taskId, taskName, taskParams, assignment).buildAndUpdate(currentState, projectId); } @@ -449,7 +449,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) private Assignment createAssignment( final String taskName, final Params taskParams, - final ClusterState currentState + final ClusterState currentState, + @Nullable final ProjectId projectId ) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); @@ -468,7 +469,7 @@ private Assignment createAssignment( // Task assignment should not rely on node order Randomness.shuffle(candidateNodes); - final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState); + final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId); assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not"; assert (assignment.getExecutorNode() == null || currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false) @@ -540,8 +541,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) * persistent tasks changed. */ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { - final List allTasks = PersistentTasks.getAllTasks(event.state()).map(Tuple::v2).toList(); - if (allTasks.isEmpty()) { + var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator(); + if (projectIdToTasksIterator.hasNext() == false) { return false; } @@ -553,10 +554,16 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { || event.metadataChanged() || masterChanged) { - for (PersistentTasks tasks : allTasks) { - for (PersistentTask task : tasks.tasks()) { + while (projectIdToTasksIterator.hasNext()) { + var projectIdToTasks = projectIdToTasksIterator.next(); + for (PersistentTask task : projectIdToTasks.v2().tasks()) { if (needsReassignment(task.getAssignment(), event.state().nodes())) { - Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); + Assignment assignment = createAssignment( + task.getTaskName(), + task.getParams(), + event.state(), + projectIdToTasks.v1() + ); if (Objects.equals(assignment, task.getAssignment()) == false) { return true; } @@ -602,7 +609,7 @@ private ClusterState reassignClusterOrSingleProjectTasks(@Nullable final Project // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { if (needsReassignment(task.getAssignment(), nodes)) { - Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState, projectId); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace( "reassigning {} task {} from node {} to node {}", diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index b58ef7523bf99..2ed5d7f925f5f 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -10,6 +10,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; @@ -63,7 +64,12 @@ public Scope scope() { *

* The default implementation returns the least loaded data node from amongst the collection of candidate nodes */ - public Assignment getAssignment(Params params, Collection candidateNodes, ClusterState clusterState) { + public Assignment getAssignment( + Params params, + Collection candidateNodes, + ClusterState clusterState, + @Nullable ProjectId projectId + ) { DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData); if (discoveryNode == null) { return NO_NODE_FOUND; @@ -105,7 +111,7 @@ protected DiscoveryNode selectLeastLoadedNode( *

* Throws an exception if the supplied params cannot be executed on the cluster in the current state. */ - public void validate(Params params, ClusterState clusterState) {} + public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {} /** * Creates a AllocatedPersistentTask for communicating with task manager diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index b79f2f6517189..71e4d96ab575a 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -1087,7 +1088,12 @@ public Scope scope() { } @Override - public Assignment getAssignment(P params, Collection candidateNodes, ClusterState clusterState) { + public Assignment getAssignment( + P params, + Collection candidateNodes, + ClusterState clusterState, + ProjectId projectId + ) { return fn.apply(params, candidateNodes, clusterState); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index e3189de94b1a6..14b40f53a759a 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.internal.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -326,12 +327,17 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition) } @Override - public Assignment getAssignment(TestParams params, Collection candidateNodes, ClusterState clusterState) { + public Assignment getAssignment( + TestParams params, + Collection candidateNodes, + ClusterState clusterState, + ProjectId projectId + ) { if (nonClusterStateCondition == false) { return new Assignment(null, "non cluster state condition prevents assignment"); } if (params == null || params.getExecutorNodeAttr() == null) { - return super.getAssignment(params, candidateNodes, clusterState); + return super.getAssignment(params, candidateNodes, clusterState, projectId); } else { DiscoveryNode executorNode = selectLeastLoadedNode( clusterState, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 029ea6dcd6871..9303588c0a511 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -43,6 +44,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -118,7 +120,7 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer } @Override - public void validate(ShardFollowTask params, ClusterState clusterState) { + public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) { final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex()); final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard(); if (primaryShard.active() == false) { @@ -131,8 +133,9 @@ public void validate(ShardFollowTask params, ClusterState clusterState) { @Override public Assignment getAssignment( final ShardFollowTask params, - Collection candidateNodes, - final ClusterState clusterState + final Collection candidateNodes, + final ClusterState clusterState, + @Nullable final ProjectId projectId ) { final DiscoveryNode node = selectLeastLoadedNode( clusterState, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java index 630aab4c78f43..86f4be7cedfc0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java @@ -93,7 +93,8 @@ private void runAssignmentTest( final Assignment assignment = executor.getAssignment( mock(ShardFollowTask.class), clusterStateBuilder.nodes().getAllNodes(), - clusterStateBuilder.build() + clusterStateBuilder.build(), + null ); consumer.accept(theSpecial, assignment); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java index 86651fe241b3d..aa34a63506e40 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; @@ -209,6 +210,9 @@ public TransformMetadata build() { } } + /** + * @deprecated use {@link #transformMetadata(ClusterState, ProjectId)} + */ @Deprecated(forRemoval = true) public static TransformMetadata getTransformMetadata(ClusterState state) { TransformMetadata TransformMetadata = (state == null) ? null : state.metadata().getSingleProjectCustom(TYPE); @@ -218,6 +222,17 @@ public static TransformMetadata getTransformMetadata(ClusterState state) { return TransformMetadata; } + public static TransformMetadata transformMetadata(@Nullable ClusterState state, @Nullable ProjectId projectId) { + if (state == null || projectId == null) { + return EMPTY_METADATA; + } + TransformMetadata transformMetadata = state.metadata().getProject(projectId).custom(TYPE); + if (transformMetadata == null) { + return EMPTY_METADATA; + } + return transformMetadata; + } + public static boolean upgradeMode(ClusterState state) { return getTransformMetadata(state).upgradeMode(); } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 3f16535908430..a2db21b2828f6 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -22,12 +22,14 @@ import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.shard.ShardId; @@ -113,7 +115,7 @@ protected AllocatedPersistentTask createTask( } @Override - public void validate(DownsampleShardTaskParams params, ClusterState clusterState) { + public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) { // This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared // after initial creation of the persistent task. var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState); @@ -126,7 +128,8 @@ public void validate(DownsampleShardTaskParams params, ClusterState clusterState public PersistentTasksCustomMetadata.Assignment getAssignment( final DownsampleShardTaskParams params, final Collection candidateNodes, - final ClusterState clusterState + final ClusterState clusterState, + @Nullable final ProjectId projectId ) { // NOTE: downsampling works by running a task per each shard of the source index. // Here we make sure we assign the task to the actual node holding the shard identified by diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java index c132912da133a..361bf6a4bee4b 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -87,7 +87,7 @@ public void testGetAssignment() { Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY ); - var result = executor.getAssignment(params, Set.of(node), clusterState); + var result = executor.getAssignment(params, Set.of(node), clusterState, null); assertThat(result.getExecutorNode(), equalTo(node.getId())); } @@ -119,7 +119,7 @@ public void testGetAssignmentMissingIndex() { Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY ); - var result = executor.getAssignment(params, Set.of(node), clusterState); + var result = executor.getAssignment(params, Set.of(node), clusterState, null); assertThat(result.getExecutorNode(), equalTo(node.getId())); assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task")); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java index e15a1d36bdb9f..a266eec5e9436 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java @@ -9,10 +9,12 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskParams; @@ -89,7 +91,8 @@ protected AllocatedPersistentTask createTask( public PersistentTasksCustomMetadata.Assignment getAssignment( SystemIndexMigrationTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { // This should select from master-eligible nodes because we already require all master-eligible nodes to have all plugins installed. // However, due to a misunderstanding, this code as-written needs to run on the master node in particular. This is not a fundamental diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index a15a733cac6c7..bbbb9af800241 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -30,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.injection.guice.Inject; @@ -693,7 +695,8 @@ protected AllocatedPersistentTask createTask( public PersistentTasksCustomMetadata.Assignment getAssignment( TaskParams params, Collection candidateNodes, - @SuppressWarnings("HiddenField") ClusterState clusterState + @SuppressWarnings("HiddenField") ClusterState clusterState, + @Nullable ProjectId projectId ) { boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); Optional optionalAssignment = getPotentialAssignment( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f45c92d3466c6..7195a02885379 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -497,7 +498,8 @@ public StartDatafeedPersistentTasksExecutor( public PersistentTasksCustomMetadata.Assignment getAssignment( StartDatafeedAction.DatafeedParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { return new DatafeedNodeSelector( clusterState, @@ -510,7 +512,7 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( } @Override - public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) { + public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState, @Nullable ProjectId projectId) { new DatafeedNodeSelector( clusterState, resolver, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 42f722e330a19..eaf2c72ef4da3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -15,9 +15,11 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; @@ -91,7 +93,8 @@ public SnapshotUpgradeTaskExecutor( public PersistentTasksCustomMetadata.Assignment getAssignment( SnapshotUpgradeTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); Optional optionalAssignment = getPotentialAssignment( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index 0e517b63f6f60..0e8a40f3e978b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -17,9 +17,11 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.license.XPackLicenseState; @@ -121,7 +123,12 @@ public OpenJobPersistentTasksExecutor( } @Override - public Assignment getAssignment(OpenJobAction.JobParams params, Collection candidateNodes, ClusterState clusterState) { + public Assignment getAssignment( + OpenJobAction.JobParams params, + Collection candidateNodes, + ClusterState clusterState, + @Nullable ProjectId projectId + ) { Job job = params.getJob(); // If the task parameters do not have a job field then the job // was first opened on a pre v6.6 node and has not been migrated @@ -210,13 +217,13 @@ static void validateJobAndId(String jobId, Job job) { } @Override - public void validate(OpenJobAction.JobParams params, ClusterState clusterState) { + public void validate(OpenJobAction.JobParams params, ClusterState clusterState, @Nullable ProjectId projectId) { final Job job = params.getJob(); final String jobId = params.getJobId(); validateJobAndId(jobId, job); // If we already know that we can't find an ml node because all ml nodes are running at capacity or // simply because there are no ml nodes in the cluster then we fail quickly here: - PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + var assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, projectId); if (assignment.equals(AWAITING_UPGRADE)) { throw makeCurrentlyBeingUpgradedException(logger, params.getJobId()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java index 33fae40f80db6..fed463aa2b49b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java @@ -62,7 +62,7 @@ public void testGetAssignment_UpgradeModeIsEnabled() { .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build())) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, null); assertThat(assignment.getExecutorNode(), is(nullValue())); assertThat(assignment.getExplanation(), is(equalTo("persistent task cannot be assigned while upgrade mode is enabled."))); } @@ -75,7 +75,7 @@ public void testGetAssignment_NoNodes() { .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, null); assertThat(assignment.getExecutorNode(), is(nullValue())); assertThat(assignment.getExplanation(), is(emptyString())); } @@ -94,7 +94,7 @@ public void testGetAssignment_NoMlNodes() { ) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, null); assertThat(assignment.getExecutorNode(), is(nullValue())); assertThat( assignment.getExplanation(), @@ -116,7 +116,7 @@ public void testGetAssignment_MlNodeIsNewerThanTheMlJobButTheAssignmentSuceeds() .nodes(DiscoveryNodes.builder().add(createNode(0, true, Version.V_7_10_0, MlConfigVersion.V_7_10_0))) .build(); - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); + Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState, null); assertThat(assignment.getExecutorNode(), is(equalTo("_node_id0"))); assertThat(assignment.getExplanation(), is(emptyString())); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java index d88e1235241d8..d1111b9c456d9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java @@ -173,7 +173,7 @@ public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { assertEquals( "Not opening [unavailable_index_with_lazy_node], " + "because not all primary shards are active for the following indices [.ml-state]", - executor.getAssignment(params, csBuilder.nodes().getAllNodes(), csBuilder.build()).getExplanation() + executor.getAssignment(params, csBuilder.nodes().getAllNodes(), csBuilder.build(), null).getExplanation() ); } @@ -195,7 +195,8 @@ public void testGetAssignment_GivenLazyJobAndNoGlobalLazyNodes() { PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment( params, csBuilder.nodes().getAllNodes(), - csBuilder.build() + csBuilder.build(), + null ); assertNotNull(assignment); assertNull(assignment.getExecutorNode()); @@ -216,7 +217,8 @@ public void testGetAssignment_GivenResetInProgress() { PersistentTasksCustomMetadata.Assignment assignment = executor.getAssignment( params, csBuilder.nodes().getAllNodes(), - csBuilder.build() + csBuilder.build(), + null ); assertNotNull(assignment); assertNull(assignment.getExecutorNode()); diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java index 784f1c1fbe23e..6796501245fd3 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -169,10 +170,11 @@ protected TaskExecutor(Client client, ClusterService clusterService, ThreadPool public PersistentTasksCustomMetadata.Assignment getAssignment( TestTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + ProjectId projectId ) { candidates.set(candidateNodes); - return super.getAssignment(params, candidateNodes, clusterState); + return super.getAssignment(params, candidateNodes, clusterState, projectId); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index b7bd434194b80..c97440b8e27a8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; @@ -116,7 +117,8 @@ public TransformPersistentTasksExecutor( public PersistentTasksCustomMetadata.Assignment getAssignment( TransformTaskParams params, Collection candidateNodes, - ClusterState clusterState + ClusterState clusterState, + @Nullable ProjectId projectId ) { /* Note: * @@ -125,14 +127,14 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( * * Operations on the transform node happen in {@link #nodeOperation()} */ - var transformMetadata = TransformMetadata.getTransformMetadata(clusterState); + var transformMetadata = TransformMetadata.transformMetadata(clusterState, projectId); if (transformMetadata.upgradeMode()) { return AWAITING_UPGRADE; } if (transformMetadata.resetMode()) { return RESET_IN_PROGRESS; } - List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); + List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver, projectId); if (unavailableIndices.size() != 0) { String reason = "Not starting transform [" + params.getId() @@ -178,7 +180,17 @@ public PersistentTasksCustomMetadata.Assignment getAssignment( return new PersistentTasksCustomMetadata.Assignment(discoveryNode.getId(), ""); } - static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, IndexNameExpressionResolver resolver) { + static List verifyIndicesPrimaryShardsAreActive( + ClusterState clusterState, + IndexNameExpressionResolver resolver, + @Nullable ProjectId projectId + ) { + // if the projectId doesn't exist, we will get an empty routing table which will have no indices + if (projectId == null) { + return List.of(); + } + + var projectRoutingTable = clusterState.routingTable(projectId); String[] indices = resolver.concreteIndexNames( clusterState, IndicesOptions.lenientExpandOpen(), @@ -187,7 +199,7 @@ static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat ); List unavailableIndices = new ArrayList<>(indices.length); for (String index : indices) { - IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index); + IndexRoutingTable routingTable = projectRoutingTable.index(index); if (routingTable == null || routingTable.allPrimaryShardsActive() == false || routingTable.readyForSearch() == false) { unavailableIndices.add(index); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java index 108bbab85935e..8f2008dab55db 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformMetadataTests.java @@ -7,11 +7,17 @@ package org.elasticsearch.xpack.transform; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.transform.TransformMetadata; +import static org.hamcrest.Matchers.equalTo; + public class TransformMetadataTests extends AbstractChunkedSerializingTestCase { @Override @@ -35,4 +41,39 @@ protected TransformMetadata mutateInstance(TransformMetadata instance) { .upgradeMode(instance.upgradeMode() == false) .build(); } + + public void testTransformMetadataFromClusterState() { + var expectedTransformMetadata = new TransformMetadata.Builder().resetMode(true).upgradeMode(true).build(); + var projectId = randomUniqueProjectId(); + var clusterState = ClusterState.builder(new ClusterName("_name")) + .metadata( + Metadata.builder().put(ProjectMetadata.builder(projectId).putCustom(TransformMetadata.TYPE, expectedTransformMetadata)) + ) + .build(); + + assertThat(TransformMetadata.transformMetadata(clusterState, projectId), equalTo(expectedTransformMetadata)); + assertThat(TransformMetadata.getTransformMetadata(clusterState), equalTo(expectedTransformMetadata)); + } + + public void testTransformMetadataFromMissingClusterState() { + assertThat(TransformMetadata.transformMetadata(null, randomUniqueProjectId()), equalTo(TransformMetadata.EMPTY_METADATA)); + assertThat(TransformMetadata.getTransformMetadata(null), equalTo(TransformMetadata.EMPTY_METADATA)); + } + + public void testTransformMetadataFromMissingProjectId() { + assertThat( + TransformMetadata.transformMetadata(ClusterState.builder(new ClusterName("_name")).build(), null), + equalTo(TransformMetadata.EMPTY_METADATA) + ); + } + + public void testTransformMetadataWhenAbsentFromClusterState() { + var projectId = randomUniqueProjectId(); + var clusterState = ClusterState.builder(new ClusterName("_name")) + .metadata(Metadata.builder().put(ProjectMetadata.builder(projectId))) + .build(); + + assertThat(TransformMetadata.transformMetadata(clusterState, projectId), equalTo(TransformMetadata.EMPTY_METADATA)); + assertThat(TransformMetadata.getTransformMetadata(clusterState), equalTo(TransformMetadata.EMPTY_METADATA)); + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index fa509143f9ba9..2546d00382660 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -12,10 +12,14 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; @@ -83,6 +87,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase { private static ThreadPool threadPool; private TransformConfigAutoMigration autoMigration; + private ProjectId projectId; @BeforeClass public static void setUpThreadPool() { @@ -106,13 +111,15 @@ public static void tearDownThreadPool() { } @Before - public void initMocks() { + public void setUp() throws Exception { + super.setUp(); autoMigration = mock(); doAnswer(ans -> { ActionListener listener = ans.getArgument(1); listener.onResponse(ans.getArgument(0)); return null; }).when(autoMigration).migrateAndSave(any(), any()); + projectId = randomUniqueProjectId(); } public void testNodeVersionAssignment() { @@ -124,7 +131,8 @@ public void testNodeVersionAssignment() { executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ).getExecutorNode(), equalTo("current-data-node-with-1-tasks") ); @@ -132,7 +140,8 @@ public void testNodeVersionAssignment() { executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ).getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled") ); @@ -140,7 +149,8 @@ public void testNodeVersionAssignment() { executor.getAssignment( new TransformTaskParams("new-old-task-id", TransformConfigVersion.V_7_7_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ).getExecutorNode(), equalTo("past-data-node-1") ); @@ -154,7 +164,8 @@ public void testNodeAssignmentProblems() { Assignment assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), List.of(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -173,7 +184,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), List.of(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -189,7 +201,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -205,7 +218,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("dedicated-transform-node")); @@ -218,7 +232,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_8_0_0, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -235,7 +250,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1")); @@ -248,7 +264,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -264,7 +281,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.CURRENT, null, false), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled")); @@ -277,7 +295,8 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNull(assignment.getExecutorNode()); assertThat( @@ -299,29 +318,30 @@ public void testNodeAssignmentProblems() { assignment = executor.getAssignment( new TransformTaskParams("new-task-id", TransformConfigVersion.V_7_5_0, null, true), cs.nodes().getAllNodes(), - cs + cs, + projectId ); assertNotNull(assignment.getExecutorNode()); assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1")); } public void testVerifyIndicesPrimaryShardsAreActive() { - Metadata.Builder metadata = Metadata.builder(); + Metadata.Builder metadata = metadataWithProject(); RoutingTable.Builder routingTable = RoutingTable.builder(); addIndices(metadata, routingTable); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - csBuilder.routingTable(routingTable.build()); + csBuilder.putRoutingTable(projectId, routingTable.build()); csBuilder.metadata(metadata); ClusterState cs = csBuilder.build(); assertEquals( 0, - TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, TestIndexNameExpressionResolver.newInstance()).size() + TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, indexNameExpressionResolver(), projectId).size() ); metadata = Metadata.builder(cs.metadata()); - routingTable = new RoutingTable.Builder(cs.routingTable()); + routingTable = new RoutingTable.Builder(cs.routingTable(projectId)); String indexToRemove = TransformInternalIndexConstants.LATEST_INDEX_NAME; if (randomBoolean()) { routingTable.remove(indexToRemove); @@ -342,11 +362,12 @@ public void testVerifyIndicesPrimaryShardsAreActive() { } csBuilder = ClusterState.builder(cs); - csBuilder.routingTable(routingTable.build()); + csBuilder.putRoutingTable(projectId, routingTable.build()); csBuilder.metadata(metadata); List result = TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive( csBuilder.build(), - TestIndexNameExpressionResolver.newInstance() + indexNameExpressionResolver(), + projectId ); assertEquals(1, result.size()); assertEquals(indexToRemove, result.get(0)); @@ -441,7 +462,7 @@ private void addIndices(Metadata.Builder metadata, RoutingTable.Builder routingT for (String indexName : indices) { IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName); indexMetadata.settings(indexSettings(IndexVersion.current(), 1, 0).put(IndexMetadata.SETTING_INDEX_UUID, "_uuid")); - metadata.put(indexMetadata); + metadata.getProject(projectId).put(indexMetadata); Index index = new Index(indexName, "_uuid"); ShardId shardId = new ShardId(index, 0); ShardRouting shardRouting = ShardRouting.newUnassigned( @@ -556,7 +577,7 @@ private DiscoveryNodes.Builder buildNodes( } private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { - Metadata.Builder metadata = Metadata.builder().clusterUUID("cluster-uuid"); + Metadata.Builder metadata = metadataWithProject().clusterUUID("cluster-uuid"); RoutingTable.Builder routingTable = RoutingTable.builder(); addIndices(metadata, routingTable); PersistentTasksCustomMetadata.Builder pTasksBuilder = PersistentTasksCustomMetadata.builder() @@ -580,15 +601,19 @@ private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { ); PersistentTasksCustomMetadata pTasks = pTasksBuilder.build(); - metadata.putCustom(PersistentTasksCustomMetadata.TYPE, pTasks); + metadata.getProject(projectId).putCustom(PersistentTasksCustomMetadata.TYPE, pTasks); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).nodes(nodes); - csBuilder.routingTable(routingTable.build()); + csBuilder.putRoutingTable(projectId, routingTable.build()); csBuilder.metadata(metadata); return csBuilder.build(); } + private Metadata.Builder metadataWithProject() { + return Metadata.builder().put(ProjectMetadata.builder(projectId)); + } + private TransformPersistentTasksExecutor buildTaskExecutor() { var transformServices = transformServices( new InMemoryTransformConfigManager(), @@ -622,11 +647,15 @@ private TransformPersistentTasksExecutor buildTaskExecutor(TransformServices tra clusterService(), Settings.EMPTY, new DefaultTransformExtension(), - TestIndexNameExpressionResolver.newInstance(), + indexNameExpressionResolver(), autoMigration ); } + private IndexNameExpressionResolver indexNameExpressionResolver() { + return TestIndexNameExpressionResolver.newInstance(TestProjectResolvers.singleProjectOnly(projectId)); + } + private ClusterService clusterService() { var clusterService = mock(ClusterService.class); var cSettings = new ClusterSettings(Settings.EMPTY, Set.of(Transform.NUM_FAILURE_RETRIES_SETTING));