1515import org .elasticsearch .action .FailedNodeException ;
1616import org .elasticsearch .action .TaskOperationFailure ;
1717import org .elasticsearch .action .support .ActionFilters ;
18+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
1819import org .elasticsearch .action .support .tasks .TransportTasksAction ;
1920import org .elasticsearch .client .internal .Client ;
2021import org .elasticsearch .client .internal .OriginSettingClient ;
2728import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
2829import org .elasticsearch .common .util .concurrent .EsExecutors ;
2930import org .elasticsearch .core .FixForMultiProject ;
31+ import org .elasticsearch .core .Predicates ;
3032import org .elasticsearch .core .TimeValue ;
33+ import org .elasticsearch .core .Tuple ;
3134import org .elasticsearch .discovery .MasterNotDiscoveredException ;
3235import org .elasticsearch .injection .guice .Inject ;
3336import org .elasticsearch .persistent .PersistentTasksClusterService ;
3942import org .elasticsearch .transport .TransportResponseHandler ;
4043import org .elasticsearch .transport .TransportService ;
4144import org .elasticsearch .xpack .core .ml .MlTasks ;
45+ import org .elasticsearch .xpack .core .ml .action .CloseJobAction ;
4246import org .elasticsearch .xpack .core .ml .action .StartDatafeedAction ;
4347import org .elasticsearch .xpack .core .ml .action .StopDatafeedAction ;
4448import org .elasticsearch .xpack .core .ml .datafeed .DatafeedState ;
4852import org .elasticsearch .xpack .ml .MachineLearning ;
4953import org .elasticsearch .xpack .ml .datafeed .persistence .DatafeedConfigProvider ;
5054import org .elasticsearch .xpack .ml .notifications .AnomalyDetectionAuditor ;
55+ import org .elasticsearch .xpack .ml .utils .TypedChainTaskExecutor ;
5156
5257import java .util .ArrayList ;
5358import java .util .Collection ;
5863import java .util .concurrent .atomic .AtomicInteger ;
5964import java .util .stream .Collectors ;
6065
66+ import static java .util .stream .Collectors .toList ;
6167import static org .elasticsearch .core .Strings .format ;
6268import static org .elasticsearch .xpack .core .ClientHelper .ML_ORIGIN ;
69+ import static org .elasticsearch .xpack .core .ClientHelper .executeAsyncWithOrigin ;
6370import static org .elasticsearch .xpack .ml .utils .ExceptionCollectionHandling .exceptionArrayToStatusException ;
6471
6572public class TransportStopDatafeedAction extends TransportTasksAction <
@@ -106,7 +113,7 @@ public TransportStopDatafeedAction(
106113 }
107114
108115 /**
109- * Sort the datafeed IDs the their task state and add to one
116+ * Sort the datafeed IDs by their task state and add to one
110117 * of the list arguments depending on the state.
111118 *
112119 * @param expandedDatafeedIds The expanded set of IDs
@@ -211,7 +218,64 @@ private void doExecute(
211218 if (request .isForce ()) {
212219 forceStopDatafeed (request , listener , tasks , nodes , notStoppedDatafeeds );
213220 } else {
214- normalStopDatafeed (task , request , listener , tasks , nodes , startedDatafeeds , stoppingDatafeeds , attempt );
221+ final List <String > startedDatafeedsJobs = new ArrayList <>();
222+ for (String datafeedId : startedDatafeeds ) {
223+ PersistentTasksCustomMetadata .PersistentTask <?> datafeedTask = MlTasks .getDatafeedTask (datafeedId , tasks );
224+ if (datafeedTask != null
225+ && PersistentTasksClusterService .needsReassignment (datafeedTask .getAssignment (), nodes ) == false ) {
226+ startedDatafeedsJobs .add (((StartDatafeedAction .DatafeedParams ) datafeedTask .getParams ()).getJobId ());
227+ }
228+ }
229+ if (request .closeJob () && startedDatafeedsJobs .isEmpty () == false ) {
230+ // If the "close_job" parameter was set to "true" on the stop datafeed request we attempt to first close the
231+ // jobs associated with the datafeeds. This will in turn attempt to stop the jobs' datafeeds (this time with the
232+ // "close_job" flag set to false, to avoid recursion)
233+ ActionListener <List <Tuple <String , AcknowledgedResponse >>> closeJobActionListener = listener
234+ .delegateFailureAndWrap ((delegate , jobsResponses ) -> {
235+ List <String > jobIds = jobsResponses .stream ()
236+ .filter (t -> t .v2 ().isAcknowledged () == false )
237+ .map (Tuple ::v1 )
238+ .collect (toList ());
239+ if (jobIds .isEmpty ()) {
240+ logger .debug ("Successfully closed jobs (and associated datafeeds)" );
241+ } else {
242+ logger .warn ("Failed to close jobs (and associated datafeeds): {}" , jobIds );
243+ }
244+ delegate .onResponse (new StopDatafeedAction .Response (true ));
245+ });
246+
247+ TypedChainTaskExecutor <Tuple <String , AcknowledgedResponse >> chainTaskExecutor = new TypedChainTaskExecutor <>(
248+ EsExecutors .DIRECT_EXECUTOR_SERVICE ,
249+ Predicates .always (),
250+ Predicates .always ()
251+ );
252+ for (String jobId : startedDatafeedsJobs ) {
253+ chainTaskExecutor .add (
254+ al -> executeAsyncWithOrigin (
255+ client ,
256+ ML_ORIGIN ,
257+ CloseJobAction .INSTANCE ,
258+ new CloseJobAction .Request (jobId ),
259+ listener .delegateFailureAndWrap (
260+ (l , response ) -> l .onResponse (new StopDatafeedAction .Response (response .isClosed ()))
261+ )
262+ )
263+ );
264+ }
265+ chainTaskExecutor .execute (closeJobActionListener );
266+ } else {
267+ normalStopDatafeed (
268+ task ,
269+ request ,
270+ listener ,
271+ tasks ,
272+ nodes ,
273+ startedDatafeeds ,
274+ stoppingDatafeeds ,
275+ startedDatafeedsJobs ,
276+ attempt
277+ );
278+ }
215279 }
216280 }, listener ::onFailure )
217281 );
@@ -226,10 +290,10 @@ private void normalStopDatafeed(
226290 DiscoveryNodes nodes ,
227291 List <String > startedDatafeeds ,
228292 List <String > stoppingDatafeeds ,
293+ List <String > startedDatafeedsJobs ,
229294 int attempt
230295 ) {
231296 final Set <String > executorNodes = new HashSet <>();
232- final List <String > startedDatafeedsJobs = new ArrayList <>();
233297 final List <String > resolvedStartedDatafeeds = new ArrayList <>();
234298 final List <PersistentTasksCustomMetadata .PersistentTask <?>> allDataFeedsToWaitFor = new ArrayList <>();
235299 for (String datafeedId : startedDatafeeds ) {
@@ -240,7 +304,6 @@ private void normalStopDatafeed(
240304 assert datafeedTask != null : msg ;
241305 logger .error (msg );
242306 } else if (PersistentTasksClusterService .needsReassignment (datafeedTask .getAssignment (), nodes ) == false ) {
243- startedDatafeedsJobs .add (((StartDatafeedAction .DatafeedParams ) datafeedTask .getParams ()).getJobId ());
244307 resolvedStartedDatafeeds .add (datafeedId );
245308 executorNodes .add (datafeedTask .getExecutorNode ());
246309 allDataFeedsToWaitFor .add (datafeedTask );
0 commit comments