Skip to content

Commit b752e61

Browse files
ywangdgeorgewallace
authored andcommitted
Require project-id for waitForPersistentTasksCondition (elastic#124180)
The method should be called with an explicit project-id to access persistent tasks from the right project. This PR does that. The callsites are updated by using the default project-id for the timebeing. They work for single project deployments but should eventually be updated for multi-project setup. Relates: ES-11039
1 parent ef37ad3 commit b752e61

File tree

8 files changed

+69
-25
lines changed

8 files changed

+69
-25
lines changed

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,14 @@ ThreadPool getThreadPool() {
240240
/**
241241
* Waits for persistent tasks to comply with a given predicate, then call back the listener accordingly.
242242
*
243-
* @param predicate the predicate to evaluate
243+
* @param projectId the project that the persistent tasks are associated with
244+
* @param predicate the predicate to evaluate, must be able to handle {@code null} input which means either the project
245+
* does not exist or persistent tasks for the project do not exist
244246
* @param timeout a timeout for waiting
245247
* @param listener the callback listener
246248
*/
247249
public void waitForPersistentTasksCondition(
250+
final ProjectId projectId,
248251
final Predicate<PersistentTasksCustomMetadata> predicate,
249252
final @Nullable TimeValue timeout,
250253
final ActionListener<Boolean> listener
@@ -264,7 +267,15 @@ public void onClusterServiceClose() {
264267
public void onTimeout(TimeValue timeout) {
265268
listener.onFailure(new IllegalStateException("Timed out when waiting for persistent tasks after " + timeout));
266269
}
267-
}, clusterState -> predicate.test(PersistentTasksCustomMetadata.get(clusterState.metadata().getDefaultProject())), timeout, logger);
270+
}, clusterState -> {
271+
final var project = clusterState.metadata().projects().get(projectId);
272+
if (project == null) {
273+
logger.debug("project [{}] not found while waiting for persistent tasks condition", projectId);
274+
return predicate.test(null);
275+
} else {
276+
return predicate.test(PersistentTasksCustomMetadata.get(project));
277+
}
278+
}, timeout, logger);
268279
}
269280

270281
public interface WaitForPersistentTaskListener<P extends PersistentTaskParams> extends ActionListener<PersistentTask<P>> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import org.elasticsearch.action.support.tasks.TransportTasksAction;
2020
import org.elasticsearch.client.internal.Client;
2121
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.metadata.Metadata;
2223
import org.elasticsearch.cluster.node.DiscoveryNodes;
2324
import org.elasticsearch.cluster.service.ClusterService;
2425
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2526
import org.elasticsearch.common.util.concurrent.AtomicArray;
2627
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2728
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.core.FixForMultiProject;
2830
import org.elasticsearch.core.TimeValue;
2931
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3032
import org.elasticsearch.injection.guice.Inject;
@@ -642,7 +644,12 @@ void waitForJobClosed(
642644
ActionListener<CloseJobAction.Response> listener,
643645
Set<String> movedJobs
644646
) {
645-
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
647+
@FixForMultiProject
648+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
649+
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
650+
if (persistentTasksCustomMetadata == null) {
651+
return true;
652+
}
646653
for (PersistentTasksCustomMetadata.PersistentTask<?> originalPersistentTask : waitForCloseRequest.persistentTasks) {
647654
String originalPersistentTaskId = originalPersistentTask.getId();
648655
PersistentTasksCustomMetadata.PersistentTask<?> currentPersistentTask = persistentTasksCustomMetadata.getTask(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,19 @@ protected void upgradeModeSuccessfullyChanged(
170170
isolateDatafeeds(tasksCustomMetadata, isolateDatafeedListener);
171171
} else {
172172
logger.info("Disabling upgrade mode, must wait for tasks to not have AWAITING_UPGRADE assignment");
173+
@FixForMultiProject
174+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
173175
persistentTasksService.waitForPersistentTasksCondition(
174176
// Wait for jobs, datafeeds and analytics not to be "Awaiting upgrade"
175-
persistentTasksCustomMetadata -> persistentTasksCustomMetadata.tasks()
176-
.stream()
177-
.noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)),
177+
projectId,
178+
persistentTasksCustomMetadata -> {
179+
if (persistentTasksCustomMetadata == null) {
180+
return true;
181+
}
182+
return persistentTasksCustomMetadata.tasks()
183+
.stream()
184+
.noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE));
185+
},
178186
request.ackTimeout(),
179187
ActionListener.wrap(r -> {
180188
logger.info("Done waiting for tasks to be out of AWAITING_UPGRADE");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
import org.elasticsearch.action.support.ActionFilters;
1818
import org.elasticsearch.action.support.tasks.TransportTasksAction;
1919
import org.elasticsearch.cluster.ClusterState;
20+
import org.elasticsearch.cluster.metadata.Metadata;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.cluster.node.DiscoveryNodes;
2223
import org.elasticsearch.cluster.service.ClusterService;
2324
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2425
import org.elasticsearch.common.util.concurrent.AtomicArray;
2526
import org.elasticsearch.common.util.concurrent.EsExecutors;
27+
import org.elasticsearch.core.FixForMultiProject;
2628
import org.elasticsearch.discovery.MasterNotDiscoveredException;
2729
import org.elasticsearch.injection.guice.Inject;
2830
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@@ -427,15 +429,17 @@ void waitForTaskRemoved(
427429
StopDataFrameAnalyticsAction.Response response,
428430
ActionListener<StopDataFrameAnalyticsAction.Response> listener
429431
) {
430-
persistentTasksService.waitForPersistentTasksCondition(
431-
persistentTasks -> persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId()))
432-
.isEmpty(),
433-
request.getTimeout(),
434-
ActionListener.wrap(booleanResponse -> {
435-
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
436-
listener.onResponse(response);
437-
}, listener::onFailure)
438-
);
432+
@FixForMultiProject
433+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
434+
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasks -> {
435+
if (persistentTasks == null) {
436+
return true;
437+
}
438+
return persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId())).isEmpty();
439+
}, request.getTimeout(), ActionListener.wrap(booleanResponse -> {
440+
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
441+
listener.onResponse(response);
442+
}, listener::onFailure));
439443
}
440444

441445
// Visible for testing

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import org.elasticsearch.client.internal.Client;
2020
import org.elasticsearch.client.internal.OriginSettingClient;
2121
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.metadata.Metadata;
2223
import org.elasticsearch.cluster.node.DiscoveryNodes;
2324
import org.elasticsearch.cluster.service.ClusterService;
2425
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2526
import org.elasticsearch.common.util.concurrent.AtomicArray;
2627
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2728
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.core.FixForMultiProject;
2830
import org.elasticsearch.core.TimeValue;
2931
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3032
import org.elasticsearch.injection.guice.Inject;
@@ -501,7 +503,12 @@ void waitForDatafeedStopped(
501503
ActionListener<StopDatafeedAction.Response> listener,
502504
Set<String> movedDatafeeds
503505
) {
504-
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
506+
@FixForMultiProject
507+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
508+
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
509+
if (persistentTasksCustomMetadata == null) {
510+
return true;
511+
}
505512
for (PersistentTasksCustomMetadata.PersistentTask<?> originalPersistentTask : datafeedPersistentTasks) {
506513
String originalPersistentTaskId = originalPersistentTask.getId();
507514
PersistentTasksCustomMetadata.PersistentTask<?> currentPersistentTask = persistentTasksCustomMetadata.getTask(

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeAction.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,15 @@ private boolean isTransformTask(PersistentTasksCustomMetadata.PersistentTask<?>
187187

188188
private void waitForTransformsToRestart(SetUpgradeModeActionRequest request, ActionListener<AcknowledgedResponse> listener) {
189189
logger.info("Disabling upgrade mode for Transforms, must wait for tasks to not have AWAITING_UPGRADE assignment");
190-
persistentTasksService.waitForPersistentTasksCondition(
191-
persistentTasksCustomMetadata -> persistentTasksCustomMetadata.tasks()
190+
@FixForMultiProject
191+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
192+
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
193+
if (persistentTasksCustomMetadata == null) {
194+
return true;
195+
}
196+
return persistentTasksCustomMetadata.tasks()
192197
.stream()
193-
.noneMatch(t -> isTransformTask(t) && t.getAssignment().equals(AWAITING_UPGRADE)),
194-
request.ackTimeout(),
195-
listener.delegateFailureAndWrap((d, r) -> d.onResponse(AcknowledgedResponse.TRUE))
196-
);
198+
.noneMatch(t -> isTransformTask(t) && t.getAssignment().equals(AWAITING_UPGRADE));
199+
}, request.ackTimeout(), listener.delegateFailureAndWrap((d, r) -> d.onResponse(AcknowledgedResponse.TRUE)));
197200
}
198201
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import org.elasticsearch.action.support.GroupedActionListener;
2121
import org.elasticsearch.action.support.tasks.TransportTasksAction;
2222
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.metadata.Metadata;
2324
import org.elasticsearch.cluster.node.DiscoveryNodes;
2425
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.Strings;
2627
import org.elasticsearch.common.logging.LoggerMessageFormat;
2728
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.core.FixForMultiProject;
2830
import org.elasticsearch.core.TimeValue;
2931
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3032
import org.elasticsearch.index.IndexNotFoundException;
@@ -388,7 +390,9 @@ private void waitForTransformStopped(
388390
// This map is accessed in the predicate and the listener callbacks
389391
final Map<String, ElasticsearchException> exceptions = new ConcurrentHashMap<>();
390392

391-
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
393+
@FixForMultiProject
394+
final var projectId = Metadata.DEFAULT_PROJECT_ID;
395+
persistentTasksService.waitForPersistentTasksCondition(projectId, persistentTasksCustomMetadata -> {
392396
if (persistentTasksCustomMetadata == null) {
393397
return true;
394398
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ private ClusterState stateWithTransformTask() {
193193

194194
public void testDisableUpgradeMode() throws InterruptedException {
195195
doAnswer(ans -> {
196-
ActionListener<Boolean> listener = ans.getArgument(2);
196+
ActionListener<Boolean> listener = ans.getArgument(3);
197197
listener.onResponse(true);
198198
return null;
199-
}).when(persistentTasksService).waitForPersistentTasksCondition(any(), any(), any());
199+
}).when(persistentTasksService).waitForPersistentTasksCondition(any(), any(), any(), any());
200200
upgradeModeSuccessfullyChanged(new SetUpgradeModeActionRequest(false), stateWithTransformTask(), assertNoFailureListener(r -> {
201201
assertThat(r, is(AcknowledgedResponse.TRUE));
202202
verify(clusterService, never()).submitUnbatchedStateUpdateTask(eq("unassign persistent task from any node"), any());

0 commit comments

Comments
 (0)