Skip to content

Commit a5adc7d

Browse files
authored
[ML] refactor internal datafeed management (#74018)
This unifies the concept of object management between datafeeds and anomaly jobs.
1 parent 484280a commit a5adc7d

File tree

9 files changed

+535
-418
lines changed

9 files changed

+535
-418
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
259259
import org.elasticsearch.xpack.ml.datafeed.DatafeedContextProvider;
260260
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
261+
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
261262
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
262263
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
263264
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
@@ -699,7 +700,17 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
699700
threadPool,
700701
client,
701702
notifier,
702-
xContentRegistry);
703+
xContentRegistry,
704+
indexNameExpressionResolver
705+
);
706+
DatafeedManager datafeedManager = new DatafeedManager(
707+
datafeedConfigProvider,
708+
jobConfigProvider,
709+
xContentRegistry,
710+
clusterService,
711+
settings,
712+
client
713+
);
703714

704715
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
705716
JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
@@ -852,7 +863,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
852863
autodetectProcessManager,
853864
new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
854865
jobDataCountsPersister,
855-
datafeedRunner,
866+
datafeedRunner,
867+
datafeedManager,
856868
anomalyDetectionAuditor,
857869
dataFrameAnalyticsAuditor,
858870
inferenceAuditor,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.inject.Inject;
2121
import org.elasticsearch.common.settings.Settings;
22-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2322
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
2423
import org.elasticsearch.persistent.PersistentTasksService;
2524
import org.elasticsearch.tasks.Task;
@@ -28,21 +27,17 @@
2827
import org.elasticsearch.xpack.core.ml.MlTasks;
2928
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
3029
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
31-
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
32-
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3330
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3431
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
35-
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
36-
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
32+
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
3733

3834
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
3935
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
4036

4137
public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNodeAction<DeleteDatafeedAction.Request> {
4238

4339
private final Client client;
44-
private final DatafeedConfigProvider datafeedConfigProvider;
45-
private final ClusterService clusterService;
40+
private final DatafeedManager datafeedManager;
4641
private final PersistentTasksService persistentTasksService;
4742
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
4843

@@ -51,14 +46,13 @@ public TransportDeleteDatafeedAction(Settings settings, TransportService transpo
5146
ThreadPool threadPool, ActionFilters actionFilters,
5247
IndexNameExpressionResolver indexNameExpressionResolver,
5348
Client client, PersistentTasksService persistentTasksService,
54-
NamedXContentRegistry xContentRegistry) {
49+
DatafeedManager datafeedManager) {
5550
super(DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
5651
DeleteDatafeedAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
5752
this.client = client;
58-
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
5953
this.persistentTasksService = persistentTasksService;
60-
this.clusterService = clusterService;
6154
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
55+
this.datafeedManager = datafeedManager;
6256
}
6357

6458
@Override
@@ -73,14 +67,15 @@ protected void masterOperation(Task task, DeleteDatafeedAction.Request request,
7367
if (request.isForce()) {
7468
forceDeleteDatafeed(request, state, listener);
7569
} else {
76-
deleteDatafeedConfig(request, listener);
70+
datafeedManager.deleteDatafeed(request, state, listener);
7771
}
7872
}
7973

8074
private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state,
8175
ActionListener<AcknowledgedResponse> listener) {
8276
ActionListener<Boolean> finalListener = ActionListener.wrap(
83-
response -> deleteDatafeedConfig(request, listener),
77+
// use clusterService.state() here so that the updated state without the task is available
78+
response -> datafeedManager.deleteDatafeed(request, clusterService.state(), listener),
8479
listener::onFailure
8580
);
8681

@@ -119,37 +114,6 @@ public void onFailure(Exception e) {
119114
}
120115
}
121116

122-
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
123-
// Check datafeed is stopped
124-
PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
125-
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) {
126-
listener.onFailure(ExceptionsHelper.conflictStatusException(
127-
Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED)));
128-
return;
129-
}
130-
131-
String datafeedId = request.getDatafeedId();
132-
133-
datafeedConfigProvider.getDatafeedConfig(
134-
datafeedId,
135-
ActionListener.wrap(
136-
datafeedConfigBuilder -> {
137-
String jobId = datafeedConfigBuilder.build().getJobId();
138-
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
139-
jobDataDeleter.deleteDatafeedTimingStats(
140-
ActionListener.wrap(
141-
unused1 -> {
142-
datafeedConfigProvider.deleteDatafeedConfig(
143-
datafeedId,
144-
ActionListener.wrap(
145-
unused2 -> listener.onResponse(AcknowledgedResponse.TRUE),
146-
listener::onFailure));
147-
},
148-
listener::onFailure));
149-
},
150-
listener::onFailure));
151-
}
152-
153117
@Override
154118
protected ClusterBlockException checkBlock(DeleteDatafeedAction.Request request, ClusterState state) {
155119
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11+
import org.apache.logging.log4j.message.ParameterizedMessage;
1112
import org.elasticsearch.ElasticsearchStatusException;
1213
import org.elasticsearch.ResourceNotFoundException;
1314
import org.elasticsearch.action.ActionListener;
@@ -23,10 +24,9 @@
2324
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2425
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2526
import org.elasticsearch.cluster.service.ClusterService;
27+
import org.elasticsearch.core.Nullable;
2628
import org.elasticsearch.common.inject.Inject;
2729
import org.elasticsearch.common.settings.Settings;
28-
import org.elasticsearch.core.CheckedConsumer;
29-
import org.elasticsearch.core.Nullable;
3030
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
3131
import org.elasticsearch.persistent.PersistentTasksService;
3232
import org.elasticsearch.tasks.Task;
@@ -46,9 +46,8 @@
4646
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4747
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
4848
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
49+
import org.elasticsearch.xpack.ml.job.JobManager;
4950
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
50-
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
51-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
5251
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
5352
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
5453

@@ -68,8 +67,8 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
6867
private final Client client;
6968
private final PersistentTasksService persistentTasksService;
7069
private final AnomalyDetectionAuditor auditor;
71-
private final JobResultsProvider jobResultsProvider;
7270
private final JobConfigProvider jobConfigProvider;
71+
private final JobManager jobManager;
7372
private final DatafeedConfigProvider datafeedConfigProvider;
7473
private final MlMemoryTracker memoryTracker;
7574
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
@@ -86,20 +85,20 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
8685
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
8786
ThreadPool threadPool, ActionFilters actionFilters,
8887
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
89-
Client client, AnomalyDetectionAuditor auditor, JobResultsProvider jobResultsProvider,
88+
Client client, AnomalyDetectionAuditor auditor,
9089
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
91-
MlMemoryTracker memoryTracker) {
90+
MlMemoryTracker memoryTracker, JobManager jobManager) {
9291
super(DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
9392
DeleteJobAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
9493
this.client = client;
9594
this.persistentTasksService = persistentTasksService;
9695
this.auditor = auditor;
97-
this.jobResultsProvider = jobResultsProvider;
9896
this.jobConfigProvider = jobConfigProvider;
9997
this.datafeedConfigProvider = datafeedConfigProvider;
10098
this.memoryTracker = memoryTracker;
10199
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
102100
this.listenersByJobId = new HashMap<>();
101+
this.jobManager = jobManager;
103102
}
104103

105104
@Override
@@ -116,7 +115,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
116115
return;
117116
}
118117

119-
logger.debug("Deleting job '{}'", request.getJobId());
118+
logger.debug(() -> new ParameterizedMessage("[{}] deleting job ", request.getJobId()));
120119

121120
if (request.isForce() == false) {
122121
checkJobIsNotOpen(request.getJobId(), state);
@@ -128,8 +127,11 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
128127
// Check if there is a deletion task for this job already and if yes wait for it to complete
129128
synchronized (listenersByJobId) {
130129
if (listenersByJobId.containsKey(request.getJobId())) {
131-
logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete",
132-
request.getJobId(), task.getId());
130+
logger.debug(() -> new ParameterizedMessage(
131+
"[{}] Deletion task [{}] will wait for existing deletion task to complete",
132+
request.getJobId(),
133+
task.getId()
134+
));
133135
listenersByJobId.get(request.getJobId()).add(listener);
134136
return;
135137
} else {
@@ -153,9 +155,9 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
153155
ActionListener<PutJobAction.Response> markAsDeletingListener = ActionListener.wrap(
154156
response -> {
155157
if (request.isForce()) {
156-
forceDeleteJob(parentTaskClient, request, finalListener);
158+
forceDeleteJob(parentTaskClient, request, state, finalListener);
157159
} else {
158-
normalDeleteJob(parentTaskClient, request, finalListener);
160+
normalDeleteJob(parentTaskClient, request, state, finalListener);
159161
}
160162
},
161163
finalListener::onFailure);
@@ -171,7 +173,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
171173
logger.info(
172174
"[{}] config is missing but task exists. Attempting to delete tasks and stop process",
173175
request.getJobId());
174-
forceDeleteJob(parentTaskClient, request, finalListener);
176+
forceDeleteJob(parentTaskClient, request, state, finalListener);
175177
} else {
176178
finalListener.onFailure(e);
177179
}
@@ -199,64 +201,41 @@ private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @
199201
}
200202
}
201203

202-
private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
204+
private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient,
205+
DeleteJobAction.Request request,
206+
ClusterState state,
203207
ActionListener<AcknowledgedResponse> listener) {
204208
String jobId = request.getJobId();
205209

206210
// We clean up the memory tracker on delete rather than close as close is not a master node action
207211
memoryTracker.removeAnomalyDetectorJob(jobId);
208212

209-
// Step 4. When the job has been removed from the cluster state, return a response
210-
// -------
211-
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
212-
if (jobDeleted) {
213-
logger.info("Job [" + jobId + "] deleted");
214-
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
215-
listener.onResponse(AcknowledgedResponse.TRUE);
216-
} else {
217-
listener.onResponse(AcknowledgedResponse.FALSE);
218-
}
219-
};
220-
221-
// Step 3. When the physical storage has been deleted, delete the job config document
222-
// -------
223-
// Don't report an error if the document has already been deleted
224-
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
225-
ActionListener.wrap(
226-
deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
227-
listener::onFailure
228-
)
229-
);
230-
231-
// Step 2. Remove the job from any calendars
232-
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
233-
ActionListener.wrap(deleteJobStateHandler::accept, listener::onFailure));
234-
235-
236-
// Step 1. Delete the physical storage
237-
new JobDataDeleter(parentTaskClient, jobId).deleteJobDocuments(
238-
jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure);
213+
jobManager.deleteJob(request, parentTaskClient, state, listener);
239214
}
240215

241-
private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
242-
ActionListener<AcknowledgedResponse> listener) {
243-
244-
logger.debug("Force deleting job [{}]", request.getJobId());
216+
private void forceDeleteJob(
217+
ParentTaskAssigningClient parentTaskClient,
218+
DeleteJobAction.Request request,
219+
ClusterState state,
220+
ActionListener<AcknowledgedResponse> listener
221+
) {
245222

246-
final ClusterState state = clusterService.state();
247223
final String jobId = request.getJobId();
224+
logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId));
248225

249226
// 3. Delete the job
250227
ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
251228
@Override
252229
public void onResponse(Boolean response) {
253-
normalDeleteJob(parentTaskClient, request, listener);
230+
// use clusterService.state() here so that the updated state without the task is available
231+
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
254232
}
255233

256234
@Override
257235
public void onFailure(Exception e) {
258236
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
259-
normalDeleteJob(parentTaskClient, request, listener);
237+
// use clusterService.state() here so that the updated state without the task is available
238+
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
260239
} else {
261240
listener.onFailure(e);
262241
}
@@ -266,12 +245,12 @@ public void onFailure(Exception e) {
266245
// 2. Cancel the persistent task. This closes the process gracefully so
267246
// the process should be killed first.
268247
ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
269-
response -> removePersistentTask(request.getJobId(), state, removeTaskListener),
248+
response -> removePersistentTask(jobId, state, removeTaskListener),
270249
e -> {
271250
if (ExceptionsHelper.unwrapCause(e) instanceof ElasticsearchStatusException) {
272251
// Killing the process marks the task as completed so it
273252
// may have disappeared when we get here
274-
removePersistentTask(request.getJobId(), state, removeTaskListener);
253+
removePersistentTask(jobId, state, removeTaskListener);
275254
} else {
276255
listener.onFailure(e);
277256
}

0 commit comments

Comments
 (0)