Skip to content

Commit 33e8ee1

Browse files
committed
Revert "Specify generic types and check project removal"
This reverts commit 493288f.
1 parent b8be7c4 commit 33e8ee1

File tree

10 files changed

+45
-44
lines changed

10 files changed

+45
-44
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
290290
);
291291

292292
int finalI = i;
293-
WaitForPersistentTaskFuture<TestParams> future1 = new WaitForPersistentTaskFuture<>();
293+
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
294294
waitForPersistentTaskCondition(
295295
persistentTasksService,
296296
taskId,
@@ -304,7 +304,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
304304
assertThat(future1.get().getId(), equalTo(taskId));
305305
}
306306

307-
WaitForPersistentTaskFuture<TestParams> future1 = new WaitForPersistentTaskFuture<>();
307+
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
308308
waitForPersistentTaskCondition(persistentTasksService, taskId, task -> false, TimeValue.timeValueMillis(10), future1);
309309

310310
assertFutureThrows(future1, IllegalStateException.class, "timed out after 10ms");
@@ -318,7 +318,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
318318
);
319319

320320
// Wait for the task to disappear
321-
WaitForPersistentTaskFuture<TestParams> future2 = new WaitForPersistentTaskFuture<>();
321+
WaitForPersistentTaskFuture<?> future2 = new WaitForPersistentTaskFuture<>();
322322
waitForPersistentTaskCondition(persistentTasksService, taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
323323

324324
logger.info("Completing the running task");
@@ -522,9 +522,9 @@ public void testAbortLocally() throws Exception {
522522
private void waitForPersistentTaskCondition(
523523
PersistentTasksService persistentTasksService,
524524
String taskId,
525-
Predicate<PersistentTask<TestParams>> predicate,
525+
Predicate<PersistentTask<?>> predicate,
526526
@Nullable TimeValue timeout,
527-
WaitForPersistentTaskListener<TestParams> listener
527+
WaitForPersistentTaskListener<?> listener
528528
) throws Exception {
529529
if (scope == PersistentTasksExecutor.Scope.CLUSTER) {
530530
@FixForMultiProject(description = "can be replaced if PersistentTasksService supports waiting for cluster task conditions")

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ public long getAllocationId() {
105105
* @param timeout a timeout for waiting
106106
* @param listener the callback listener
107107
*/
108-
public <P extends PersistentTaskParams> void waitForPersistentTask(
109-
final Predicate<PersistentTasksCustomMetadata.PersistentTask<P>> predicate,
108+
public void waitForPersistentTask(
109+
final Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> predicate,
110110
final @Nullable TimeValue timeout,
111-
final PersistentTasksService.WaitForPersistentTaskListener<P> listener
111+
final PersistentTasksService.WaitForPersistentTaskListener<?> listener
112112
) {
113113
persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
114114
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,11 @@ private <Req extends ActionRequest, Resp extends PersistentTaskResponse> void ex
173173
* @param listener the callback listener
174174
*/
175175
@Deprecated(forRemoval = true)
176-
public <P extends PersistentTaskParams> void waitForPersistentTaskCondition(
176+
public void waitForPersistentTaskCondition(
177177
final String taskId,
178-
final Predicate<PersistentTask<P>> predicate,
178+
final Predicate<PersistentTask<?>> predicate,
179179
final @Nullable TimeValue timeout,
180-
final WaitForPersistentTaskListener<P> listener
180+
final WaitForPersistentTaskListener<?> listener
181181
) {
182182
final var projectId = clusterService.state().metadata().getProject().id();
183183
waitForPersistentTaskCondition(projectId, taskId, predicate, timeout, listener);
@@ -192,19 +192,17 @@ public <P extends PersistentTaskParams> void waitForPersistentTaskCondition(
192192
* @param timeout a timeout for waiting
193193
* @param listener the callback listener
194194
*/
195-
public <P extends PersistentTaskParams> void waitForPersistentTaskCondition(
195+
public void waitForPersistentTaskCondition(
196196
final ProjectId projectId,
197197
final String taskId,
198-
final Predicate<PersistentTask<P>> predicate,
198+
final Predicate<PersistentTask<?>> predicate,
199199
final @Nullable TimeValue timeout,
200-
final WaitForPersistentTaskListener<P> listener
200+
final WaitForPersistentTaskListener<?> listener
201201
) {
202202
ClusterStateObserver.waitForState(clusterService, threadPool.getThreadContext(), new ClusterStateObserver.Listener() {
203203
@Override
204204
public void onNewClusterState(ClusterState state) {
205-
final var project = state.metadata().projects().get(projectId);
206-
final PersistentTask<P> task = project == null ? null : PersistentTasksCustomMetadata.getTaskWithId(project, taskId);
207-
listener.onResponse(task);
205+
listener.onResponse(PersistentTasksCustomMetadata.getTaskWithId(state.metadata().getProject(projectId), taskId));
208206
}
209207

210208
@Override
@@ -216,11 +214,13 @@ public void onClusterServiceClose() {
216214
public void onTimeout(TimeValue timeout) {
217215
listener.onTimeout(timeout);
218216
}
219-
}, clusterState -> {
220-
final var project = clusterState.metadata().projects().get(projectId);
221-
final PersistentTask<P> task = project == null ? null : PersistentTasksCustomMetadata.getTaskWithId(project, taskId);
222-
return predicate.test(task);
223-
}, timeout, logger);
217+
},
218+
clusterState -> predicate.test(
219+
PersistentTasksCustomMetadata.getTaskWithId(clusterState.metadata().getProject(projectId), taskId)
220+
),
221+
timeout,
222+
logger
223+
);
224224
}
225225

226226
// visible for testing

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.index.shard.ShardId;
6767
import org.elasticsearch.indices.IndicesService;
6868
import org.elasticsearch.injection.guice.Inject;
69+
import org.elasticsearch.persistent.PersistentTaskParams;
6970
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
7071
import org.elasticsearch.persistent.PersistentTasksService;
7172
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
@@ -504,18 +505,18 @@ private void performShardDownsampling(
504505
dimensionFields,
505506
shardId
506507
);
507-
Predicate<PersistentTasksCustomMetadata.PersistentTask<DownsampleShardTaskParams>> predicate = runningTask -> {
508+
Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> predicate = runningTask -> {
508509
if (runningTask == null) {
509510
// NOTE: don't need to wait if the persistent task completed and was removed
510511
return true;
511512
}
512513
DownsampleShardPersistentTaskState runningPersistentTaskState = (DownsampleShardPersistentTaskState) runningTask.getState();
513514
return runningPersistentTaskState != null && runningPersistentTaskState.done();
514515
};
515-
var taskListener = new PersistentTasksService.WaitForPersistentTaskListener<DownsampleShardTaskParams>() {
516+
var taskListener = new PersistentTasksService.WaitForPersistentTaskListener<>() {
516517

517518
@Override
518-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<DownsampleShardTaskParams> persistentTask) {
519+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
519520
if (persistentTask != null) {
520521
var runningPersistentTaskState = (DownsampleShardPersistentTaskState) persistentTask.getState();
521522
if (runningPersistentTaskState != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,14 @@ public void onFailure(Exception e) {
357357
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
358358
* of endpoints waiting for a condition tested by this predicate would never get a response.
359359
*/
360-
private static class JobPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams>> {
360+
private static class JobPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
361361

362362
private volatile Exception exception;
363363
private volatile String node = "";
364364
private volatile boolean shouldCancel;
365365

366366
@Override
367-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask) {
367+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
368368
JobState jobState = JobState.CLOSED;
369369
String reason = null;
370370
if (persistentTask != null) {
@@ -381,7 +381,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.J
381381

382382
// This logic is only appropriate when opening a job, not when reallocating following a failure,
383383
// and this is why this class must only be used when opening a job
384-
OpenJobAction.JobParams params = persistentTask.getParams();
384+
OpenJobAction.JobParams params = (OpenJobAction.JobParams) persistentTask.getParams();
385385
Optional<ElasticsearchException> assignmentException = checkAssignmentState(assignment, params.getJobId(), logger);
386386
if (assignmentException.isPresent()) {
387387
exception = assignmentException.get();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.license.LicenseUtils;
3737
import org.elasticsearch.license.XPackLicenseState;
3838
import org.elasticsearch.persistent.AllocatedPersistentTask;
39+
import org.elasticsearch.persistent.PersistentTaskParams;
3940
import org.elasticsearch.persistent.PersistentTaskState;
4041
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
4142
import org.elasticsearch.persistent.PersistentTasksService;
@@ -457,10 +458,11 @@ private void waitForAnalyticsStarted(
457458
task.getId(),
458459
predicate,
459460
timeout,
460-
new PersistentTasksService.WaitForPersistentTaskListener<>() {
461+
462+
new PersistentTasksService.WaitForPersistentTaskListener<PersistentTaskParams>() {
461463

462464
@Override
463-
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<TaskParams> persistentTask) {
465+
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
464466
if (predicate.exception != null) {
465467
// We want to return to the caller without leaving an unassigned persistent task, to match
466468
// what would have happened if the error had been detected in the "fast fail" validation
@@ -528,14 +530,14 @@ private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progre
528530
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
529531
* of endpoints waiting for a condition tested by this predicate would never get a response.
530532
*/
531-
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<TaskParams>> {
533+
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
532534

533535
private volatile Exception exception;
534536
private volatile String node = "";
535537
private volatile String assignmentExplanation;
536538

537539
@Override
538-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<TaskParams> persistentTask) {
540+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
539541
if (persistentTask == null) {
540542
return false;
541543
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ private void waitForDatafeedStarted(
369369
taskId,
370370
predicate,
371371
params.getTimeout(),
372-
new PersistentTasksService.WaitForPersistentTaskListener<>() {
372+
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
373373
@Override
374374
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
375375
if (predicate.exception != null) {
@@ -711,15 +711,13 @@ public GetDatafeedRunningStateAction.Response.RunningState getRunningState() {
711711
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
712712
* of endpoints waiting for a condition tested by this predicate would never get a response.
713713
*/
714-
private static class DatafeedPredicate
715-
implements
716-
Predicate<PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams>> {
714+
private static class DatafeedPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
717715

718716
private volatile Exception exception;
719717
private volatile String node = "";
720718

721719
@Override
722-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
720+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
723721
if (persistentTask == null) {
724722
return false;
725723
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private void waitForJobStarted(
247247
taskId,
248248
predicate,
249249
request.getTimeout(),
250-
new PersistentTasksService.WaitForPersistentTaskListener<>() {
250+
new PersistentTasksService.WaitForPersistentTaskListener<SnapshotUpgradeTaskParams>() {
251251
@Override
252252
public void onResponse(PersistentTask<SnapshotUpgradeTaskParams> persistentTask) {
253253
if (predicate.getException() != null) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradePredicate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor.checkAssignmentState;
2222

23-
public class SnapshotUpgradePredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<SnapshotUpgradeTaskParams>> {
23+
public class SnapshotUpgradePredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
2424
private final boolean waitForCompletion;
2525
private final Logger logger;
2626
private volatile Exception exception;
@@ -50,7 +50,7 @@ public boolean isCompleted() {
5050
}
5151

5252
@Override
53-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<SnapshotUpgradeTaskParams> persistentTask) {
53+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
5454
// Persistent task being null means it has been removed from state, and is now complete
5555
if (persistentTask == null) {
5656
isCompleted = true;
@@ -64,7 +64,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask<SnapshotUpgrade
6464
PersistentTasksCustomMetadata.Assignment assignment = persistentTask.getAssignment();
6565
// This logic is only appropriate when opening a job, not when reallocating following a failure,
6666
// and this is why this class must only be used when opening a job
67-
SnapshotUpgradeTaskParams params = persistentTask.getParams();
67+
SnapshotUpgradeTaskParams params = (SnapshotUpgradeTaskParams) persistentTask.getParams();
6868
Optional<ElasticsearchException> assignmentException = checkAssignmentState(assignment, params.getJobId(), logger);
6969
if (assignmentException.isPresent()) {
7070
exception = assignmentException.get();

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ private void waitForTransformTaskStarted(
339339
taskId,
340340
predicate,
341341
timeout,
342-
new PersistentTasksService.WaitForPersistentTaskListener<>() {
342+
new PersistentTasksService.WaitForPersistentTaskListener<TransformTaskParams>() {
343343
@Override
344344
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams> persistentTask) {
345345
if (predicate.exception != null) {
@@ -374,12 +374,12 @@ public void onTimeout(TimeValue timeout) {
374374
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
375375
* of endpoints waiting for a condition tested by this predicate would never get a response.
376376
*/
377-
private static class TransformPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams>> {
377+
private static class TransformPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
378378

379379
private volatile Exception exception;
380380

381381
@Override
382-
public boolean test(PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams> persistentTask) {
382+
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
383383
if (persistentTask == null) {
384384
return false;
385385
}

0 commit comments

Comments
 (0)