7
7
8
8
import org .apache .logging .log4j .LogManager ;
9
9
import org .apache .logging .log4j .Logger ;
10
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
10
11
import org .elasticsearch .action .ActionListener ;
11
12
import org .elasticsearch .action .DocWriteResponse ;
12
13
import org .elasticsearch .action .bulk .BulkItemResponse ;
26
27
import org .elasticsearch .cluster .service .ClusterService ;
27
28
import org .elasticsearch .common .inject .Inject ;
28
29
import org .elasticsearch .common .io .stream .StreamInput ;
30
+ import org .elasticsearch .common .unit .TimeValue ;
29
31
import org .elasticsearch .index .query .QueryBuilder ;
30
32
import org .elasticsearch .index .query .QueryBuilders ;
31
33
import org .elasticsearch .index .reindex .AbstractBulkByScrollRequest ;
@@ -109,33 +111,59 @@ protected void masterOperation(DeleteDataFrameAnalyticsAction.Request request, C
109
111
@ Override
110
112
protected void masterOperation (Task task , DeleteDataFrameAnalyticsAction .Request request , ClusterState state ,
111
113
ActionListener <AcknowledgedResponse > listener ) {
112
- String id = request .getId ();
113
114
TaskId taskId = new TaskId (clusterService .localNode ().getId (), task .getId ());
114
115
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient (client , taskId );
115
116
116
117
if (request .isForce ()) {
117
- forceDelete (parentTaskClient , id , listener );
118
+ forceDelete (parentTaskClient , request , listener );
118
119
} else {
119
- normalDelete (parentTaskClient , state , id , listener );
120
+ normalDelete (parentTaskClient , state , request , listener );
120
121
}
121
122
}
122
123
123
- private void forceDelete (ParentTaskAssigningClient parentTaskClient , String id ,
124
+ private void forceDelete (ParentTaskAssigningClient parentTaskClient , DeleteDataFrameAnalyticsAction . Request request ,
124
125
ActionListener <AcknowledgedResponse > listener ) {
125
- logger .debug ("[{}] Force deleting data frame analytics job" , id );
126
+ logger .debug ("[{}] Force deleting data frame analytics job" , request . getId () );
126
127
127
128
ActionListener <StopDataFrameAnalyticsAction .Response > stopListener = ActionListener .wrap (
128
- stopResponse -> normalDelete (parentTaskClient , clusterService .state (), id , listener ),
129
+ stopResponse -> normalDelete (parentTaskClient , clusterService .state (), request , listener ),
129
130
listener ::onFailure
130
131
);
131
132
132
- StopDataFrameAnalyticsAction .Request request = new StopDataFrameAnalyticsAction .Request (id );
133
- request .setForce (true );
134
- executeAsyncWithOrigin (parentTaskClient , ML_ORIGIN , StopDataFrameAnalyticsAction .INSTANCE , request , stopListener );
133
+ stopJob (parentTaskClient , request , stopListener );
135
134
}
136
135
137
- private void normalDelete (ParentTaskAssigningClient parentTaskClient , ClusterState state , String id ,
138
- ActionListener <AcknowledgedResponse > listener ) {
136
+ private void stopJob (ParentTaskAssigningClient parentTaskClient , DeleteDataFrameAnalyticsAction .Request request ,
137
+ ActionListener <StopDataFrameAnalyticsAction .Response > listener ) {
138
+ // We first try to stop the job normally. Normal stop returns after the job was stopped.
139
+ // If that fails then we proceed to force stopping which returns as soon as the persistent task is removed.
140
+ // If we just did force stopping, then there is a chance we proceed to delete the config while it's
141
+ // still used from the running task which results in logging errors.
142
+
143
+ StopDataFrameAnalyticsAction .Request stopRequest = new StopDataFrameAnalyticsAction .Request (request .getId ());
144
+ stopRequest .setTimeout (request .timeout ());
145
+
146
+ ActionListener <StopDataFrameAnalyticsAction .Response > normalStopListener = ActionListener .wrap (
147
+ listener ::onResponse ,
148
+ normalStopFailure -> {
149
+ stopRequest .setForce (true );
150
+ executeAsyncWithOrigin (parentTaskClient , ML_ORIGIN , StopDataFrameAnalyticsAction .INSTANCE , stopRequest , ActionListener .wrap (
151
+ listener ::onResponse ,
152
+ forceStopFailure -> {
153
+ logger .error (new ParameterizedMessage ("[{}] Failed to stop normally" , request .getId ()), normalStopFailure );
154
+ logger .error (new ParameterizedMessage ("[{}] Failed to stop forcefully" , request .getId ()), forceStopFailure );
155
+ listener .onFailure (forceStopFailure );
156
+ }
157
+ ));
158
+ }
159
+ );
160
+
161
+ executeAsyncWithOrigin (parentTaskClient , ML_ORIGIN , StopDataFrameAnalyticsAction .INSTANCE , stopRequest , normalStopListener );
162
+ }
163
+
164
+ private void normalDelete (ParentTaskAssigningClient parentTaskClient , ClusterState state ,
165
+ DeleteDataFrameAnalyticsAction .Request request , ActionListener <AcknowledgedResponse > listener ) {
166
+ String id = request .getId ();
139
167
PersistentTasksCustomMetadata tasks = state .getMetadata ().custom (PersistentTasksCustomMetadata .TYPE );
140
168
DataFrameAnalyticsState taskState = MlTasks .getDataFrameAnalyticsState (id , tasks );
141
169
if (taskState != DataFrameAnalyticsState .STOPPED ) {
@@ -178,14 +206,14 @@ private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterSta
178
206
logger .warn ("[{}] DBQ failure: {}" , id , failure );
179
207
}
180
208
}
181
- deleteStats (parentTaskClient , id , deleteStatsHandler );
209
+ deleteStats (parentTaskClient , id , request . timeout (), deleteStatsHandler );
182
210
},
183
211
listener ::onFailure
184
212
);
185
213
186
214
// Step 2. Delete state
187
215
ActionListener <DataFrameAnalyticsConfig > configListener = ActionListener .wrap (
188
- config -> deleteState (parentTaskClient , config , deleteStateHandler ),
216
+ config -> deleteState (parentTaskClient , config , request . timeout (), deleteStateHandler ),
189
217
listener ::onFailure
190
218
);
191
219
@@ -214,6 +242,7 @@ private void deleteConfig(ParentTaskAssigningClient parentTaskClient, String id,
214
242
215
243
private void deleteState (ParentTaskAssigningClient parentTaskClient ,
216
244
DataFrameAnalyticsConfig config ,
245
+ TimeValue timeout ,
217
246
ActionListener <BulkByScrollResponse > listener ) {
218
247
List <String > ids = new ArrayList <>();
219
248
ids .add (StoredProgress .documentId (config .getId ()));
@@ -224,29 +253,33 @@ private void deleteState(ParentTaskAssigningClient parentTaskClient,
224
253
parentTaskClient ,
225
254
AnomalyDetectorsIndex .jobStateIndexPattern (),
226
255
QueryBuilders .idsQuery ().addIds (ids .toArray (new String [0 ])),
256
+ timeout ,
227
257
listener
228
258
);
229
259
}
230
260
231
261
private void deleteStats (ParentTaskAssigningClient parentTaskClient ,
232
262
String jobId ,
263
+ TimeValue timeout ,
233
264
ActionListener <BulkByScrollResponse > listener ) {
234
265
executeDeleteByQuery (
235
266
parentTaskClient ,
236
267
MlStatsIndex .indexPattern (),
237
268
QueryBuilders .termQuery (Fields .JOB_ID .getPreferredName (), jobId ),
269
+ timeout ,
238
270
listener
239
271
);
240
272
}
241
273
242
- private void executeDeleteByQuery (ParentTaskAssigningClient parentTaskClient , String index , QueryBuilder query ,
274
+ private void executeDeleteByQuery (ParentTaskAssigningClient parentTaskClient , String index , QueryBuilder query , TimeValue timeout ,
243
275
ActionListener <BulkByScrollResponse > listener ) {
244
276
DeleteByQueryRequest request = new DeleteByQueryRequest (index );
245
277
request .setQuery (query );
246
278
request .setIndicesOptions (MlIndicesUtils .addIgnoreUnavailable (IndicesOptions .lenientExpandOpen ()));
247
279
request .setSlices (AbstractBulkByScrollRequest .AUTO_SLICES );
248
280
request .setAbortOnVersionConflict (false );
249
281
request .setRefresh (true );
282
+ request .setTimeout (timeout );
250
283
executeAsyncWithOrigin (parentTaskClient , ML_ORIGIN , DeleteByQueryAction .INSTANCE , request , listener );
251
284
}
252
285
0 commit comments