Skip to content

Commit fe97af9

Browse files
authored
[ML] fix race condition with job snapshot upgrade and feature reset (#85121) (#85396)
This addresses a race condition in feature reset && model snapshot upgrade. It is possible that feature reset is called soon after the task is assigned, meaning the persistent task is cancelled before the model snapshot upgrader is set on the task. This means that the task could continue executing even after being cancelled as nothing else checks if the feature is being reset or if the task has been cancelled. closes #84709
1 parent a2ae13f commit fe97af9

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,17 @@ public void upgradeSnapshot(SnapshotUpgradeTask task, Consumer<Exception> closeH
480480
);
481481
return;
482482
}
483+
if (resetInProgress) {
484+
logger.trace(
485+
() -> new ParameterizedMessage(
486+
"Aborted upgrading snapshot [{}] for job [{}] as ML feature is being reset",
487+
snapshotId,
488+
jobId
489+
)
490+
);
491+
closeHandler.accept(null);
492+
return;
493+
}
483494
// We need to fork, otherwise we restore model state from a network thread (several GET api calls):
484495
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
485496
@Override
@@ -500,6 +511,17 @@ protected void doRun() {
500511
closeHandler.accept(null);
501512
return;
502513
}
514+
if (resetInProgress) {
515+
logger.trace(
516+
() -> new ParameterizedMessage(
517+
"Aborted upgrading snapshot [{}] for job [{}] as ML feature is being reset",
518+
snapshotId,
519+
jobId
520+
)
521+
);
522+
closeHandler.accept(null);
523+
return;
524+
}
503525
runSnapshotUpgrade(task, job, params, closeHandler);
504526
}
505527
});

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ public final class JobModelSnapshotUpgrader {
102102
}
103103

104104
synchronized void start() {
105-
task.setJobModelSnapshotUpgrader(this);
105+
if (task.setJobModelSnapshotUpgrader(this) == false) {
106+
this.killProcess(task.getReasonCancelled());
107+
return;
108+
}
106109

107110
// A TP with no queue, so that we fail immediately if there are no threads available
108111
ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTask.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,15 @@ protected synchronized void onCancelled() {
7070
}
7171
}
7272

73-
public synchronized void setJobModelSnapshotUpgrader(JobModelSnapshotUpgrader jobModelSnapshotUpgrader) {
73+
/**
74+
* @param jobModelSnapshotUpgrader the snapshot upgrader
75+
* @return false if the task has been canceled and upgrader not set
76+
*/
77+
public synchronized boolean setJobModelSnapshotUpgrader(JobModelSnapshotUpgrader jobModelSnapshotUpgrader) {
78+
if (this.isCancelled()) {
79+
return false;
80+
}
7481
this.jobModelSnapshotUpgrader = jobModelSnapshotUpgrader;
82+
return true;
7583
}
7684
}

0 commit comments

Comments
 (0)