diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 52fc7bab42f80..e9550c0628897 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -240,11 +240,14 @@ ThreadPool getThreadPool() { /** * Waits for persistent tasks to comply with a given predicate, then call back the listener accordingly. * - * @param predicate the predicate to evaluate + * @param projectId the project that the persistent tasks are associated with + * @param predicate the predicate to evaluate, must be able to handle {@code null} input which means either the project + * does not exist or persistent tasks for the project do not exist * @param timeout a timeout for waiting * @param listener the callback listener */ public void waitForPersistentTasksCondition( + final ProjectId projectId, final Predicate predicate, final @Nullable TimeValue timeout, final ActionListener listener @@ -264,7 +267,15 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { listener.onFailure(new IllegalStateException("Timed out when waiting for persistent tasks after " + timeout)); } - }, clusterState -> predicate.test(PersistentTasksCustomMetadata.get(clusterState.metadata().getDefaultProject())), timeout, logger); + }, clusterState -> { + final var project = clusterState.metadata().projects().get(projectId); + if (project == null) { + logger.debug("project [{}] not found while waiting for persistent tasks condition", projectId); + return predicate.test(null); + } else { + return predicate.test(PersistentTasksCustomMetadata.get(project)); + } + }, timeout, logger); } public interface WaitForPersistentTaskListener

extends ActionListener> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 128a2ac347311..c1fb7f10f6aea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -19,12 +19,14 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.injection.guice.Inject; @@ -642,7 +644,12 @@ void waitForJobClosed( ActionListener listener, Set movedJobs ) { - persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> { + @FixForMultiProject + final var projectId = Metadata.DEFAULT_PROJECT_ID; + persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> { + if (persistentTasksCustomMetadata == null) { + return true; + } for (PersistentTasksCustomMetadata.PersistentTask originalPersistentTask : waitForCloseRequest.persistentTasks) { String originalPersistentTaskId = originalPersistentTask.getId(); PersistentTasksCustomMetadata.PersistentTask currentPersistentTask = persistentTasksCustomMetadata.getTask( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index ce5679eb7c945..a937d310a47f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -170,11 +170,19 @@ protected void upgradeModeSuccessfullyChanged( isolateDatafeeds(tasksCustomMetadata, isolateDatafeedListener); } else { logger.info("Disabling upgrade mode, must wait for tasks to not have AWAITING_UPGRADE assignment"); + @FixForMultiProject + final var projectId = Metadata.DEFAULT_PROJECT_ID; persistentTasksService.waitForPersistentTasksCondition( // Wait for jobs, datafeeds and analytics not to be "Awaiting upgrade" - persistentTasksCustomMetadata -> persistentTasksCustomMetadata.tasks() - .stream() - .noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)), + projectId, + persistentTasksCustomMetadata -> { + if (persistentTasksCustomMetadata == null) { + return true; + } + return persistentTasksCustomMetadata.tasks() + .stream() + .noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)); + }, request.ackTimeout(), ActionListener.wrap(r -> { logger.info("Done waiting for tasks to be out of AWAITING_UPGRADE"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 2ff9036316b19..23dd122ebad04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -17,12 +17,14 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -427,15 +429,17 @@ void waitForTaskRemoved( StopDataFrameAnalyticsAction.Response response, ActionListener listener ) { - persistentTasksService.waitForPersistentTasksCondition( - persistentTasks -> persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId())) - .isEmpty(), - request.getTimeout(), - ActionListener.wrap(booleanResponse -> { - auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED); - listener.onResponse(response); - }, listener::onFailure) - ); + @FixForMultiProject + final var projectId = Metadata.DEFAULT_PROJECT_ID; + persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasks -> { + if (persistentTasks == null) { + return true; + } + return persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId())).isEmpty(); + }, request.getTimeout(), ActionListener.wrap(booleanResponse -> { + auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED); + listener.onResponse(response); + }, listener::onFailure)); } // Visible for testing diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index f58f1ca201014..34bdfc82ac971 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -19,12 +19,14 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.injection.guice.Inject; @@ -501,7 +503,12 @@ void waitForDatafeedStopped( ActionListener listener, Set movedDatafeeds ) { - persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> { + @FixForMultiProject + final var projectId = Metadata.DEFAULT_PROJECT_ID; + persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> { + if (persistentTasksCustomMetadata == null) { + return true; + } for (PersistentTasksCustomMetadata.PersistentTask originalPersistentTask : datafeedPersistentTasks) { String originalPersistentTaskId = originalPersistentTask.getId(); PersistentTasksCustomMetadata.PersistentTask currentPersistentTask = persistentTasksCustomMetadata.getTask( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java index ea1422c90f801..975c342637ee1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java @@ -187,12 +187,15 @@ private boolean isTransformTask(PersistentTasksCustomMetadata.PersistentTask private void waitForTransformsToRestart(SetUpgradeModeActionRequest request, ActionListener listener) { logger.info("Disabling upgrade mode for Transforms, must wait for tasks to not have AWAITING_UPGRADE assignment"); - persistentTasksService.waitForPersistentTasksCondition( - persistentTasksCustomMetadata -> persistentTasksCustomMetadata.tasks() + @FixForMultiProject + final var projectId = Metadata.DEFAULT_PROJECT_ID; + persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> { + if (persistentTasksCustomMetadata == null) { + return true; + } + return persistentTasksCustomMetadata.tasks() .stream() - .noneMatch(t -> isTransformTask(t) && t.getAssignment().equals(AWAITING_UPGRADE)), - request.ackTimeout(), - listener.delegateFailureAndWrap((d, r) -> d.onResponse(AcknowledgedResponse.TRUE)) - ); + .noneMatch(t -> isTransformTask(t) && t.getAssignment().equals(AWAITING_UPGRADE)); + }, request.ackTimeout(), listener.delegateFailureAndWrap((d, r) -> d.onResponse(AcknowledgedResponse.TRUE))); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index 6855da53def7e..2eb544744856c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -20,11 +20,13 @@ import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.index.IndexNotFoundException; @@ -388,7 +390,9 @@ private void waitForTransformStopped( // This map is accessed in the predicate and the listener callbacks final Map exceptions = new ConcurrentHashMap<>(); - persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> { + @FixForMultiProject + final var projectId = Metadata.DEFAULT_PROJECT_ID; + persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> { if (persistentTasksCustomMetadata == null) { return true; } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeActionTests.java index 675720f27b153..33fad76117a53 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeActionTests.java @@ -193,10 +193,10 @@ private ClusterState stateWithTransformTask() { public void testDisableUpgradeMode() throws InterruptedException { doAnswer(ans -> { - ActionListener listener = ans.getArgument(2); + ActionListener listener = ans.getArgument(3); listener.onResponse(true); return null; - }).when(persistentTasksService).waitForPersistentTasksCondition(any(), any(), any()); + }).when(persistentTasksService).waitForPersistentTasksCondition(any(), any(), any(), any()); upgradeModeSuccessfullyChanged(new SetUpgradeModeActionRequest(false), stateWithTransformTask(), assertNoFailureListener(r -> { assertThat(r, is(AcknowledgedResponse.TRUE)); verify(clusterService, never()).submitUnbatchedStateUpdateTask(eq("unassign persistent task from any node"), any());