diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java index 00a846bf7eb9a..804553416c0f1 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; @@ -22,27 +24,31 @@ public class CancelReindexDataStreamTransportAction extends HandledTransportAction { private final PersistentTasksService persistentTasksService; + private final ProjectResolver projectResolver; @Inject public CancelReindexDataStreamTransportAction( TransportService transportService, ActionFilters actionFilters, - PersistentTasksService persistentTasksService + PersistentTasksService persistentTasksService, + ProjectResolver projectResolver ) { super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.persistentTasksService = persistentTasksService; + this.projectResolver = projectResolver; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { String index = request.getIndex(); + ProjectId projectId = projectResolver.getProjectId(); String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index; /* * This removes the persistent task from the cluster state and results in the running task being cancelled (but not removed from * the task manager). The running task is removed from the task manager in ReindexDataStreamTask::onCancelled, which is called as * as result of this. */ - persistentTasksService.sendRemoveRequest(persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() { + persistentTasksService.sendProjectRemoveRequest(projectId, persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() { @Override public void onResponse(PersistentTasksCustomMetadata.PersistentTask persistentTask) { listener.onResponse(AcknowledgedResponse.TRUE); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportAction.java index 7b87e444f9048..231b01619a503 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportAction.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.migrate.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -22,7 +20,9 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -36,20 +36,22 @@ import org.elasticsearch.transport.TransportService; import java.util.HashMap; +import java.util.Objects; public class CopyLifecycleIndexMetadataTransportAction extends TransportMasterNodeAction< CopyLifecycleIndexMetadataAction.Request, AcknowledgedResponse> { - private static final Logger logger = LogManager.getLogger(CopyLifecycleIndexMetadataTransportAction.class); private final ClusterStateTaskExecutor executor; private final MasterServiceTaskQueue taskQueue; + private final ProjectResolver projectResolver; @Inject public CopyLifecycleIndexMetadataTransportAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( CopyLifecycleIndexMetadataAction.NAME, @@ -63,11 +65,14 @@ public CopyLifecycleIndexMetadataTransportAction( ); this.executor = new SimpleBatchedAckListenerTaskExecutor<>() { @Override - public Tuple executeTask(UpdateIndexMetadataTask task, ClusterState clusterState) { - return new Tuple<>(applyUpdate(clusterState, task), task); + public Tuple executeTask(UpdateIndexMetadataTask task, ClusterState state) { + var projectMetadata = state.metadata().getProject(task.projectId); + var updatedMetadata = applyUpdate(projectMetadata, task); + return new Tuple<>(ClusterState.builder(state).putProjectMetadata(updatedMetadata).build(), task); } }; this.taskQueue = clusterService.createTaskQueue("migrate-copy-index-metadata", Priority.NORMAL, this.executor); + this.projectResolver = projectResolver; } @Override @@ -79,7 +84,13 @@ protected void masterOperation( ) { taskQueue.submitTask( "migrate-copy-index-metadata", - new UpdateIndexMetadataTask(request.sourceIndex(), request.destIndex(), request.ackTimeout(), listener), + new UpdateIndexMetadataTask( + projectResolver.getProjectId(), + request.sourceIndex(), + request.destIndex(), + request.ackTimeout(), + listener + ), request.masterNodeTimeout() ); } @@ -89,13 +100,15 @@ protected ClusterBlockException checkBlock(CopyLifecycleIndexMetadataAction.Requ return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataTask updateTask) { + private static ProjectMetadata applyUpdate(ProjectMetadata projectMetadata, UpdateIndexMetadataTask updateTask) { + assert projectMetadata != null && updateTask != null; + assert Objects.equals(updateTask.projectId, projectMetadata.id()); - IndexMetadata sourceMetadata = state.metadata().getProject().index(updateTask.sourceIndex); + IndexMetadata sourceMetadata = projectMetadata.index(updateTask.sourceIndex); if (sourceMetadata == null) { throw new IndexNotFoundException(updateTask.sourceIndex); } - IndexMetadata destMetadata = state.metadata().getProject().index(updateTask.destIndex); + IndexMetadata destMetadata = projectMetadata.index(updateTask.destIndex); if (destMetadata == null) { throw new IndexNotFoundException(updateTask.destIndex); } @@ -113,19 +126,26 @@ private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataT // creation date updates settings so must increment settings version .settingsVersion(destMetadata.getSettingsVersion() + 1); - var indices = new HashMap<>(state.metadata().getProject().indices()); + var indices = new HashMap<>(projectMetadata.indices()); indices.put(updateTask.destIndex, newDestMetadata.build()); - Metadata newMetadata = Metadata.builder(state.metadata()).indices(indices).build(); - return ClusterState.builder(state).metadata(newMetadata).build(); + return ProjectMetadata.builder(projectMetadata).indices(indices).build(); } static class UpdateIndexMetadataTask extends AckedBatchedClusterStateUpdateTask { + private final ProjectId projectId; private final String sourceIndex; private final String destIndex; - UpdateIndexMetadataTask(String sourceIndex, String destIndex, TimeValue ackTimeout, ActionListener listener) { + UpdateIndexMetadataTask( + ProjectId projectId, + String sourceIndex, + String destIndex, + TimeValue ackTimeout, + ActionListener listener + ) { super(ackTimeout, listener); + this.projectId = projectId; this.sourceIndex = sourceIndex; this.destIndex = destIndex; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java index 81d44c188d915..73bea4ec8d923 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -46,6 +47,7 @@ public class CreateIndexFromSourceTransportAction extends HandledTransportAction private final ClusterService clusterService; private final Client client; private final IndexScopedSettings indexScopedSettings; + private final ProjectResolver projectResolver; private static final Set INDEX_BLOCK_SETTINGS = Set.of( IndexMetadata.SETTING_READ_ONLY, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, @@ -60,7 +62,8 @@ public CreateIndexFromSourceTransportAction( ClusterService clusterService, ActionFilters actionFilters, Client client, - IndexScopedSettings indexScopedSettings + IndexScopedSettings indexScopedSettings, + ProjectResolver projectResolver ) { super( CreateIndexFromSourceAction.NAME, @@ -73,12 +76,13 @@ public CreateIndexFromSourceTransportAction( this.clusterService = clusterService; this.client = client; this.indexScopedSettings = indexScopedSettings; + this.projectResolver = projectResolver; } @Override protected void doExecute(Task task, CreateIndexFromSourceAction.Request request, ActionListener listener) { - IndexMetadata sourceIndex = clusterService.state().getMetadata().getProject().index(request.sourceIndex()); + IndexMetadata sourceIndex = projectResolver.getProjectMetadata(clusterService.state()).index(request.sourceIndex()); if (sourceIndex == null) { listener.onFailure(new IndexNotFoundException(request.sourceIndex())); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java index db6dedb4bccc1..532e6e5b808e1 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Strings; @@ -48,18 +49,21 @@ public class GetMigrationReindexStatusTransportAction extends HandledTransportAc private final ClusterService clusterService; private final TransportService transportService; private final Client client; + private final ProjectResolver projectResolver; @Inject public GetMigrationReindexStatusTransportAction( ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - Client client + Client client, + ProjectResolver projectResolver ) { super(GetMigrationReindexStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.clusterService = clusterService; this.transportService = transportService; this.client = client; + this.projectResolver = projectResolver; } @Override @@ -67,7 +71,7 @@ protected void doExecute(Task task, Request request, ActionListener li String index = request.getIndex(); String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index; PersistentTasksCustomMetadata.PersistentTask persistentTask = PersistentTasksCustomMetadata.getTaskWithId( - clusterService.state(), + projectResolver.getProjectMetadata(clusterService.state()), persistentTaskId ); if (persistentTask == null) { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index ada10ad83fa63..81e747331a251 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Setting; @@ -108,6 +109,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio private final ClusterService clusterService; private final Client client; private final TransportService transportService; + private final ProjectResolver projectResolver; /* * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests * to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that @@ -121,7 +123,8 @@ public ReindexDataStreamIndexTransportAction( TransportService transportService, ClusterService clusterService, ActionFilters actionFilters, - Client client + Client client, + ProjectResolver projectResolver ) { super( ReindexDataStreamIndexAction.NAME, @@ -134,6 +137,7 @@ public ReindexDataStreamIndexTransportAction( this.clusterService = clusterService; this.client = client; this.transportService = transportService; + this.projectResolver = projectResolver; } @Override @@ -142,11 +146,11 @@ protected void doExecute( ReindexDataStreamIndexAction.Request request, ActionListener listener ) { - var project = clusterService.state().projectState(); + var projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); var sourceIndexName = request.getSourceIndex(); var destIndexName = generateDestIndexName(sourceIndexName); TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); - IndexMetadata sourceIndex = project.metadata().index(sourceIndexName); + IndexMetadata sourceIndex = projectMetadata.index(sourceIndexName); if (sourceIndex == null) { listener.onFailure(new ResourceNotFoundException("source index [{}] does not exist", sourceIndexName)); return; @@ -154,7 +158,7 @@ protected void doExecute( Settings settingsBefore = sourceIndex.getSettings(); - var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(project.metadata(), false, true); + var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(projectMetadata, false, true); if (hasOldVersion.test(sourceIndex.getIndex()) == false) { logger.warn( "Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]", diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index cbeb31a6b89e6..bcb7af6b1f228 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -15,7 +15,8 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; @@ -41,6 +42,7 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction listener) { String sourceDataStreamName = request.getSourceDataStream(); - Metadata metadata = clusterService.state().metadata(); - DataStream dataStream = metadata.getProject().dataStreams().get(sourceDataStreamName); + final var projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); + DataStream dataStream = projectMetadata == null ? null : projectMetadata.dataStreams().get(sourceDataStreamName); if (dataStream == null) { listener.onFailure(new ResourceNotFoundException("Data stream named [{}] does not exist", sourceDataStreamName)); return; @@ -76,7 +80,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList int totalIndices = dataStream.getIndices().size(); int totalIndicesToBeUpgraded = (int) dataStream.getIndices() .stream() - .filter(getReindexRequiredPredicate(metadata.getProject(), false, dataStream.isSystem())) + .filter(getReindexRequiredPredicate(projectMetadata, false, dataStream.isSystem())) .count(); ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams( sourceDataStreamName, @@ -85,11 +89,11 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList totalIndicesToBeUpgraded, ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state()) ); - String persistentTaskId = getPersistentTaskId(sourceDataStreamName); - final var persistentTask = PersistentTasksCustomMetadata.getTaskWithId(clusterService.state(), persistentTaskId); - + final var projectId = projectMetadata.id(); + final var persistentTaskId = getPersistentTaskId(sourceDataStreamName); + final var persistentTask = PersistentTasksCustomMetadata.getTaskWithId(projectMetadata, persistentTaskId); if (persistentTask == null) { - startTask(listener, persistentTaskId, params); + startTask(projectId, listener, persistentTaskId, params); } else { GetMigrationReindexStatusAction.Request statusRequest = new GetMigrationReindexStatusAction.Request(sourceDataStreamName); statusRequest.setParentTask(task.getParentTaskId()); @@ -106,7 +110,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList CancelReindexDataStreamAction.INSTANCE, cancelRequest, getListener.delegateFailureAndWrap( - (cancelListener, cancelResponse) -> startTask(cancelListener, persistentTaskId, params) + (cancelListener, cancelResponse) -> startTask(projectId, cancelListener, persistentTaskId, params) ) ); }) @@ -115,8 +119,14 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList } - private void startTask(ActionListener listener, String persistentTaskId, ReindexDataStreamTaskParams params) { - persistentTasksService.sendStartRequest( + private void startTask( + ProjectId projectId, + ActionListener listener, + String persistentTaskId, + ReindexDataStreamTaskParams params + ) { + persistentTasksService.sendProjectStartRequest( + projectId, persistentTaskId, ReindexDataStreamTask.TASK_NAME, params, diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index 0949f1084355b..050551ec759ae 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -28,6 +28,8 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -70,12 +72,14 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec private final Client client; private final ClusterService clusterService; private final ThreadPool threadPool; + private final ProjectResolver projectResolver; public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) { super(taskName, threadPool.generic()); this.client = client; this.clusterService = clusterService; this.threadPool = threadPool; + this.projectResolver = client.projectResolver(); } @Override @@ -87,8 +91,10 @@ protected ReindexDataStreamTask createTask( PersistentTasksCustomMetadata.PersistentTask taskInProgress, Map headers ) { + ProjectId projectId = projectResolver.getProjectId(); ReindexDataStreamTaskParams params = taskInProgress.getParams(); return new ReindexDataStreamTask( + projectId, clusterService, params.startTime(), params.totalIndices(), @@ -125,7 +131,7 @@ protected void nodeOperation( if (dataStreamInfos.size() == 1) { DataStream dataStream = dataStreamInfos.getFirst().getDataStream(); boolean includeSystem = dataStream.isSystem(); - if (getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false, includeSystem).test( + if (getReindexRequiredPredicate(projectResolver.getProjectMetadata(clusterService.state()), false, includeSystem).test( dataStream.getWriteIndex() )) { RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null); @@ -174,7 +180,7 @@ private void reindexIndices( ) { List indices = dataStream.getIndices(); List indicesToBeReindexed = indices.stream() - .filter(getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false, dataStream.isSystem())) + .filter(getReindexRequiredPredicate(projectResolver.getProjectMetadata(clusterService.state()), false, dataStream.isSystem())) .toList(); final ReindexDataStreamPersistentTaskState updatedState; if (params.totalIndices() != totalIndicesInDataStream @@ -337,7 +343,7 @@ private TimeValue updateCompletionTimeAndGetTimeToLive( @Nullable ReindexDataStreamPersistentTaskState state ) { PersistentTasksCustomMetadata.PersistentTask persistentTask = PersistentTasksCustomMetadata.getTaskWithId( - clusterService.state(), + projectResolver.getProjectMetadata(clusterService.state()), reindexDataStreamTask.getPersistentTaskId() ); if (persistentTask == null) { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java index 996ac936af8b2..cbce7f2d86e24 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.migrate.task; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.TimeValue; @@ -26,6 +27,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { public static final String TASK_NAME = "reindex-data-stream"; + private final ProjectId projectId; private final ClusterService clusterService; private final long persistentTaskStartTime; private final int initialTotalIndices; @@ -39,6 +41,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { @SuppressWarnings("this-escape") public ReindexDataStreamTask( + ProjectId projectId, ClusterService clusterService, long persistentTaskStartTime, int initialTotalIndices, @@ -51,6 +54,7 @@ public ReindexDataStreamTask( Map headers ) { super(id, type, action, description, parentTask, headers); + this.projectId = projectId; this.clusterService = clusterService; this.persistentTaskStartTime = persistentTaskStartTime; this.initialTotalIndices = initialTotalIndices; @@ -69,10 +73,10 @@ public ReindexDataStreamTask( public ReindexDataStreamStatus getStatus() { int totalIndices = initialTotalIndices; int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded; - PersistentTasksCustomMetadata.PersistentTask persistentTask = PersistentTasksCustomMetadata.getTaskWithId( - clusterService.state(), - getPersistentTaskId() - ); + final var projectMetadata = clusterService.state().metadata().getProject(projectId); + PersistentTasksCustomMetadata.PersistentTask persistentTask = projectMetadata == null + ? null + : PersistentTasksCustomMetadata.getTaskWithId(projectMetadata, getPersistentTaskId()); boolean isComplete; if (persistentTask != null) { ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState(); @@ -129,11 +133,10 @@ public void incrementInProgressIndicesCount(String index) { } private boolean isCompleteInClusterState() { - PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() - .getMetadata() - .getProject() - .custom(PersistentTasksCustomMetadata.TYPE); - PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId()); + final var projectMetadata = clusterService.state().metadata().getProject(projectId); + PersistentTasksCustomMetadata.PersistentTask persistentTask = projectMetadata == null + ? null + : PersistentTasksCustomMetadata.getTaskWithId(projectMetadata, getPersistentTaskId()); if (persistentTask != null) { ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState(); if (state != null) { diff --git a/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle index 436767ee1bd16..e9a1395abb782 100644 --- a/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle @@ -39,9 +39,6 @@ tasks.named("yamlRestTest").configure { '^health/10_usage/*', '^ilm/80_health/*', '^logsdb/10_usage/*', - '^migrate/10_reindex/*', - '^migrate/20_reindex_status/*', - '^migrate/30_create_from/*', '^migration/10_get_feature_upgrade_status/*', '^migration/20_post_feature_upgrade/*', '^ml/3rd_party_deployment/*',