Skip to content

Commit 493288f

Browse files
committed
Specify generic types and check project removal
1 parent 7038b73 commit 493288f

File tree

10 files changed

+44
-45
lines changed

10 files changed

+44
-45
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
290290
);
291291

292292
int finalI = i;
293-
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
293+
WaitForPersistentTaskFuture<TestParams> future1 = new WaitForPersistentTaskFuture<>();
294294
waitForPersistentTaskCondition(
295295
persistentTasksService,
296296
taskId,
@@ -304,7 +304,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
304304
assertThat(future1.get().getId(), equalTo(taskId));
305305
}
306306

307-
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
307+
WaitForPersistentTaskFuture<TestParams> future1 = new WaitForPersistentTaskFuture<>();
308308
waitForPersistentTaskCondition(persistentTasksService, taskId, task -> false, TimeValue.timeValueMillis(10), future1);
309309

310310
assertFutureThrows(future1, IllegalStateException.class, "timed out after 10ms");
@@ -318,7 +318,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
318318
);
319319

320320
// Wait for the task to disappear
321-
WaitForPersistentTaskFuture<?> future2 = new WaitForPersistentTaskFuture<>();
321+
WaitForPersistentTaskFuture<TestParams> future2 = new WaitForPersistentTaskFuture<>();
322322
waitForPersistentTaskCondition(persistentTasksService, taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
323323

324324
logger.info("Completing the running task");
@@ -522,9 +522,9 @@ public void testAbortLocally() throws Exception {
522522
private void waitForPersistentTaskCondition(
523523
PersistentTasksService persistentTasksService,
524524
String taskId,
525-
Predicate<PersistentTask<?>> predicate,
525+
Predicate<PersistentTask<TestParams>> predicate,
526526
@Nullable TimeValue timeout,
527-
WaitForPersistentTaskListener<?> listener
527+
WaitForPersistentTaskListener<TestParams> listener
528528
) throws Exception {
529529
if (scope == PersistentTasksExecutor.Scope.CLUSTER) {
530530
@FixForMultiProject(description = "can be replaced if PersistentTasksService supports waiting for cluster task conditions")

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ public long getAllocationId() {
105105
* @param timeout a timeout for waiting
106106
* @param listener the callback listener
107107
*/
108-
public void waitForPersistentTask(
109-
final Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> predicate,
108+
public <P extends PersistentTaskParams> void waitForPersistentTask(
109+
final Predicate<PersistentTasksCustomMetadata.PersistentTask<P>> predicate,
110110
final @Nullable TimeValue timeout,
111-
final PersistentTasksService.WaitForPersistentTaskListener<?> listener
111+
final PersistentTasksService.WaitForPersistentTaskListener<P> listener
112112
) {
113113
persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
114114
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,11 @@ private <Req extends ActionRequest, Resp extends PersistentTaskResponse> void ex
173173
* @param listener the callback listener
174174
*/
175175
@Deprecated(forRemoval = true)
176-
public void waitForPersistentTaskCondition(
176+
public <P extends PersistentTaskParams> void waitForPersistentTaskCondition(
177177
final String taskId,
178-
final Predicate<PersistentTask<?>> predicate,
178+
final Predicate<PersistentTask<P>> predicate,
179179
final @Nullable TimeValue timeout,
180-
final WaitForPersistentTaskListener<?> listener
180+
final WaitForPersistentTaskListener<P> listener
181181
) {
182182
final var projectId = clusterService.state().metadata().getProject().id();
183183
waitForPersistentTaskCondition(projectId, taskId, predicate, timeout, listener);
@@ -192,17 +192,19 @@ public void waitForPersistentTaskCondition(
192192
* @param timeout a timeout for waiting
193193
* @param listener the callback listener
194194
*/
195-
public void waitForPersistentTaskCondition(
195+
public <P extends PersistentTaskParams> void waitForPersistentTaskCondition(
196196
final ProjectId projectId,
197197
final String taskId,
198-
final Predicate<PersistentTask<?>> predicate,
198+
final Predicate<PersistentTask<P>> predicate,
199199
final @Nullable TimeValue timeout,
200-
final WaitForPersistentTaskListener<?> listener
200+
final WaitForPersistentTaskListener<P> listener
201201
) {
202202
ClusterStateObserver.waitForState(clusterService, threadPool.getThreadContext(), new ClusterStateObserver.Listener() {
203203
@Override
204204
public void onNewClusterState(ClusterState state) {
205-
listener.onResponse(PersistentTasksCustomMetadata.getTaskWithId(state.metadata().getProject(projectId), taskId));
205+
final var project = state.metadata().projects().get(projectId);
206+
final PersistentTask<P> task = project == null ? null : PersistentTasksCustomMetadata.getTaskWithId(project, taskId);
207+
listener.onResponse(task);
206208
}
207209

208210
@Override
@@ -214,13 +216,11 @@ public void onClusterServiceClose() {
214216
public void onTimeout(TimeValue timeout) {
215217
listener.onTimeout(timeout);
216218
}
217-
},
218-
clusterState -> predicate.test(
219-
PersistentTasksCustomMetadata.getTaskWithId(clusterState.metadata().getProject(projectId), taskId)
220-
),
221-
timeout,
222-
logger
223-
);
219+
}, clusterState -> {
220+
final var project = clusterState.metadata().projects().get(projectId);
221+
final PersistentTask<P> task = project == null ? null : PersistentTasksCustomMetadata.getTaskWithId(project, taskId);
222+
return predicate.test(task);
223+
}, timeout, logger);
224224
}
225225

226226
// visible for testing

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.elasticsearch.index.shard.ShardId;
6767
import org.elasticsearch.indices.IndicesService;
6868
import org.elasticsearch.injection.guice.Inject;
69-
import org.elasticsearch.persistent.PersistentTaskParams;
7069
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
7170
import org.elasticsearch.persistent.PersistentTasksService;
7271
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
@@ -505,18 +504,18 @@ private void performShardDownsampling(
505504
dimensionFields,
506505
shardId
507506
);
508-
Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> predicate = runningTask -> {
507+
Predicate<PersistentTasksCustomMetadata.PersistentTask<DownsampleShardTaskParams>> predicate = runningTask -> {
509508
if (runningTask == null) {
510509
// NOTE: don't need to wait if the persistent task completed and was removed
511510
return true;
512511
}
513512
DownsampleShardPersistentTaskState runningPersistentTaskState = (DownsampleShardPersistentTaskState) runningTask.getState();
514513
return runningPersistentTaskState != null && runningPersistentTaskState.done();
515514
};
516-
var taskListener = new PersistentTasksService.WaitForPersistentTaskListener<>() {
515+
var taskListener = new PersistentTasksService.WaitForPersistentTaskListener<DownsampleShardTaskParams>() {
517516

518517
@Override
519-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
518+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<DownsampleShardTaskParams> persistentTask) {
520519
if (persistentTask != null) {
521520
var runningPersistentTaskState = (DownsampleShardPersistentTaskState) persistentTask.getState();
522521
if (runningPersistentTaskState != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,14 @@ public void onFailure(Exception e) {
357357
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
358358
* of endpoints waiting for a condition tested by this predicate would never get a response.
359359
*/
360-
private static class JobPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
360+
private static class JobPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams>> {
361361

362362
private volatile Exception exception;
363363
private volatile String node = "";
364364
private volatile boolean shouldCancel;
365365

366366
@Override
367-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
367+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask) {
368368
JobState jobState = JobState.CLOSED;
369369
String reason = null;
370370
if (persistentTask != null) {
@@ -381,7 +381,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTa
381381

382382
// This logic is only appropriate when opening a job, not when reallocating following a failure,
383383
// and this is why this class must only be used when opening a job
384-
OpenJobAction.JobParams params = (OpenJobAction.JobParams) persistentTask.getParams();
384+
OpenJobAction.JobParams params = persistentTask.getParams();
385385
Optional<ElasticsearchException> assignmentException = checkAssignmentState(assignment, params.getJobId(), logger);
386386
if (assignmentException.isPresent()) {
387387
exception = assignmentException.get();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.license.LicenseUtils;
3737
import org.elasticsearch.license.XPackLicenseState;
3838
import org.elasticsearch.persistent.AllocatedPersistentTask;
39-
import org.elasticsearch.persistent.PersistentTaskParams;
4039
import org.elasticsearch.persistent.PersistentTaskState;
4140
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
4241
import org.elasticsearch.persistent.PersistentTasksService;
@@ -458,11 +457,10 @@ private void waitForAnalyticsStarted(
458457
task.getId(),
459458
predicate,
460459
timeout,
461-
462-
new PersistentTasksService.WaitForPersistentTaskListener<PersistentTaskParams>() {
460+
new PersistentTasksService.WaitForPersistentTaskListener<>() {
463461

464462
@Override
465-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
463+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<TaskParams> persistentTask) {
466464
if (predicate.exception != null) {
467465
// We want to return to the caller without leaving an unassigned persistent task, to match
468466
// what would have happened if the error had been detected in the "fast fail" validation
@@ -530,14 +528,14 @@ private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progre
530528
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
531529
* of endpoints waiting for a condition tested by this predicate would never get a response.
532530
*/
533-
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
531+
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<TaskParams>> {
534532

535533
private volatile Exception exception;
536534
private volatile String node = "";
537535
private volatile String assignmentExplanation;
538536

539537
@Override
540-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
538+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<TaskParams> persistentTask) {
541539
if (persistentTask == null) {
542540
return false;
543541
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ private void waitForDatafeedStarted(
369369
taskId,
370370
predicate,
371371
params.getTimeout(),
372-
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
372+
new PersistentTasksService.WaitForPersistentTaskListener<>() {
373373
@Override
374374
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
375375
if (predicate.exception != null) {
@@ -711,13 +711,15 @@ public GetDatafeedRunningStateAction.Response.RunningState getRunningState() {
711711
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
712712
* of endpoints waiting for a condition tested by this predicate would never get a response.
713713
*/
714-
private static class DatafeedPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
714+
private static class DatafeedPredicate
715+
implements
716+
Predicate<PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams>> {
715717

716718
private volatile Exception exception;
717719
private volatile String node = "";
718720

719721
@Override
720-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
722+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
721723
if (persistentTask == null) {
722724
return false;
723725
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private void waitForJobStarted(
247247
taskId,
248248
predicate,
249249
request.getTimeout(),
250-
new PersistentTasksService.WaitForPersistentTaskListener<SnapshotUpgradeTaskParams>() {
250+
new PersistentTasksService.WaitForPersistentTaskListener<>() {
251251
@Override
252252
public void onResponse(PersistentTask<SnapshotUpgradeTaskParams> persistentTask) {
253253
if (predicate.getException() != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradePredicate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor.checkAssignmentState;
2222

23-
public class SnapshotUpgradePredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
23+
public class SnapshotUpgradePredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<SnapshotUpgradeTaskParams>> {
2424
private final boolean waitForCompletion;
2525
private final Logger logger;
2626
private volatile Exception exception;
@@ -50,7 +50,7 @@ public boolean isCompleted() {
5050
}
5151

5252
@Override
53-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
53+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<SnapshotUpgradeTaskParams> persistentTask) {
5454
// Persistent task being null means it has been removed from state, and is now complete
5555
if (persistentTask == null) {
5656
isCompleted = true;
@@ -64,7 +64,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTa
6464
PersistentTasksCustomMetadata.Assignment assignment = persistentTask.getAssignment();
6565
// This logic is only appropriate when opening a job, not when reallocating following a failure,
6666
// and this is why this class must only be used when opening a job
67-
SnapshotUpgradeTaskParams params = (SnapshotUpgradeTaskParams) persistentTask.getParams();
67+
SnapshotUpgradeTaskParams params = persistentTask.getParams();
6868
Optional<ElasticsearchException> assignmentException = checkAssignmentState(assignment, params.getJobId(), logger);
6969
if (assignmentException.isPresent()) {
7070
exception = assignmentException.get();

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ private void waitForTransformTaskStarted(
339339
taskId,
340340
predicate,
341341
timeout,
342-
new PersistentTasksService.WaitForPersistentTaskListener<TransformTaskParams>() {
342+
new PersistentTasksService.WaitForPersistentTaskListener<>() {
343343
@Override
344344
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams> persistentTask) {
345345
if (predicate.exception != null) {
@@ -374,12 +374,12 @@ public void onTimeout(TimeValue timeout) {
374374
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
375375
* of endpoints waiting for a condition tested by this predicate would never get a response.
376376
*/
377-
private static class TransformPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
377+
private static class TransformPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams>> {
378378

379379
private volatile Exception exception;
380380

381381
@Override
382-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
382+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams> persistentTask) {
383383
if (persistentTask == null) {
384384
return false;
385385
}

0 commit comments

Comments
 (0)