|
33 | 33 | import org.elasticsearch.tasks.TaskId;
|
34 | 34 | import org.elasticsearch.threadpool.ThreadPool;
|
35 | 35 | import org.elasticsearch.transport.TransportService;
|
| 36 | +import org.elasticsearch.xpack.core.ClientHelper; |
36 | 37 | import org.elasticsearch.xpack.core.ml.MlTasks;
|
| 38 | +import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; |
37 | 39 | import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
|
38 | 40 | import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
|
39 | 41 | import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
@@ -162,11 +164,20 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
|
162 | 164 | },
|
163 | 165 | finalListener::onFailure);
|
164 | 166 |
|
165 |
| - ActionListener<Boolean> jobExistsListener = ActionListener.wrap( |
| 167 | + ActionListener<AcknowledgedResponse> datafeedDeleteListener = ActionListener.wrap( |
166 | 168 | response -> {
|
167 | 169 | auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
|
168 |
| - markJobAsDeletingIfNotUsed(request.getJobId(), taskId, markAsDeletingListener); |
| 170 | + cancelResetTaskIfExists(request.getJobId(), ActionListener.wrap( |
| 171 | + r -> jobConfigProvider.updateJobBlockReason(request.getJobId(), new Blocked(Blocked.Reason.DELETE, taskId), |
| 172 | + markAsDeletingListener), |
| 173 | + finalListener::onFailure |
| 174 | + )); |
169 | 175 | },
|
| 176 | + finalListener::onFailure |
| 177 | + ); |
| 178 | + |
| 179 | + ActionListener<Boolean> jobExistsListener = ActionListener.wrap( |
| 180 | + response -> deleteDatafeedIfNecessary(request, datafeedDeleteListener), |
170 | 181 | e -> {
|
171 | 182 | if (request.isForce()
|
172 | 183 | && MlTasks.getJobTask(request.getJobId(), state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE)) != null) {
|
@@ -223,23 +234,17 @@ private void forceDeleteJob(
|
223 | 234 | logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId));
|
224 | 235 |
|
225 | 236 | // 3. Delete the job
|
226 |
| - ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() { |
227 |
| - @Override |
228 |
| - public void onResponse(Boolean response) { |
229 |
| - // use clusterService.state() here so that the updated state without the task is available |
230 |
| - normalDeleteJob(parentTaskClient, request, clusterService.state(), listener); |
231 |
| - } |
232 |
| - |
233 |
| - @Override |
234 |
| - public void onFailure(Exception e) { |
| 237 | + ActionListener<Boolean> removeTaskListener = ActionListener.wrap( |
| 238 | + response -> normalDeleteJob(parentTaskClient, request, clusterService.state(), listener), |
| 239 | + e -> { |
235 | 240 | if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
|
236 | 241 | // use clusterService.state() here so that the updated state without the task is available
|
237 | 242 | normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
|
238 | 243 | } else {
|
239 | 244 | listener.onFailure(e);
|
240 | 245 | }
|
241 | 246 | }
|
242 |
| - }; |
| 247 | + ); |
243 | 248 |
|
244 | 249 | // 2. Cancel the persistent task. This closes the process gracefully so
|
245 | 250 | // the process should be killed first.
|
@@ -288,21 +293,42 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) {
|
288 | 293 | }
|
289 | 294 | }
|
290 | 295 |
|
291 |
| - private void markJobAsDeletingIfNotUsed(String jobId, TaskId taskId, ActionListener<PutJobAction.Response> listener) { |
| 296 | + private void deleteDatafeedIfNecessary(DeleteJobAction.Request deleteJobRequest, ActionListener<AcknowledgedResponse> listener) { |
292 | 297 |
|
293 |
| - datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( |
294 |
| - datafeedIds -> { |
295 |
| - if (datafeedIds.isEmpty() == false) { |
296 |
| - listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" |
297 |
| - + datafeedIds.iterator().next() + "] refers to it")); |
298 |
| - return; |
299 |
| - } |
300 |
| - cancelResetTaskIfExists(jobId, ActionListener.wrap( |
301 |
| - response -> jobConfigProvider.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.DELETE, taskId), listener), |
302 |
| - listener::onFailure |
303 |
| - )); |
304 |
| - }, |
305 |
| - listener::onFailure |
| 298 | + datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(deleteJobRequest.getJobId()), ActionListener.wrap( |
| 299 | + datafeedIds -> { |
| 300 | + // Since it's only possible to delete a single job at a time there should not be more than one datafeed |
| 301 | + assert datafeedIds.size() <= 1 : "Expected at most 1 datafeed for a single job, got " + datafeedIds; |
| 302 | + if (datafeedIds.isEmpty()) { |
| 303 | + listener.onResponse(AcknowledgedResponse.TRUE); |
| 304 | + return; |
| 305 | + } |
| 306 | + DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(datafeedIds.iterator().next()); |
| 307 | + deleteDatafeedRequest.setForce(deleteJobRequest.isForce()); |
| 308 | + deleteDatafeedRequest.timeout(deleteJobRequest.timeout()); |
| 309 | + ClientHelper.executeAsyncWithOrigin( |
| 310 | + client, |
| 311 | + ClientHelper.ML_ORIGIN, |
| 312 | + DeleteDatafeedAction.INSTANCE, |
| 313 | + deleteDatafeedRequest, |
| 314 | + ActionListener.wrap( |
| 315 | + listener::onResponse, |
| 316 | + e -> { |
| 317 | + // It's possible that a simultaneous call to delete the datafeed has deleted it in between |
| 318 | + // us finding the datafeed ID and trying to delete it in this method - this is OK |
| 319 | + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { |
| 320 | + listener.onResponse(AcknowledgedResponse.TRUE); |
| 321 | + } else { |
| 322 | + listener.onFailure(ExceptionsHelper.conflictStatusException( |
| 323 | + "failed to delete job [{}] as its datafeed [{}] could not be deleted", e, |
| 324 | + deleteJobRequest.getJobId(), deleteDatafeedRequest.getDatafeedId()) |
| 325 | + ); |
| 326 | + } |
| 327 | + } |
| 328 | + ) |
| 329 | + ); |
| 330 | + }, |
| 331 | + listener::onFailure |
306 | 332 | ));
|
307 | 333 | }
|
308 | 334 |
|
|
0 commit comments