diff --git a/docs/changelog/138634.yaml b/docs/changelog/138634.yaml new file mode 100644 index 0000000000000..a6bf054365739 --- /dev/null +++ b/docs/changelog/138634.yaml @@ -0,0 +1,6 @@ +pr: 138634 +summary: Add "close_job" parameter to the stop datafeed API +area: Machine Learning +type: enhancement +issues: + - 138010 diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/ml.stop_datafeed.json b/rest-api-spec/src/main/resources/rest-api-spec/api/ml.stop_datafeed.json index 3e56b5c72878d..752c8634e03c7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/ml.stop_datafeed.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/ml.stop_datafeed.json @@ -41,6 +41,11 @@ "default": false, "description": "True if the datafeed should be forcefully stopped." }, + "close_job": { + "type": "boolean", + "default": false, + "description": "True if the job associated with the datafeed should be closed." + }, "timeout": { "type": "time", "default": "20s", diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java index 1278d1a57ec55..d41b17978cbbb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java @@ -42,6 +42,7 @@ public static class Request extends BaseTasksRequest implements ToXCont public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField FORCE = new ParseField("force"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField CLOSE_JOB = new ParseField("close_job"); public static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { @@ -52,6 +53,7 @@ public static class Request extends BaseTasksRequest implements ToXCont ); PARSER.declareBoolean(Request::setForce, FORCE); PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH); + PARSER.declareBoolean(Request::setCloseJob, CLOSE_JOB); } public static Request parseRequest(String datafeedId, XContentParser parser) { @@ -67,6 +69,7 @@ public static Request parseRequest(String datafeedId, XContentParser parser) { private TimeValue stopTimeout = DEFAULT_TIMEOUT; private boolean force = false; private boolean allowNoMatch = true; + private boolean closeJob = false; public Request(String datafeedId) { this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); @@ -81,6 +84,7 @@ public Request(StreamInput in) throws IOException { stopTimeout = in.readTimeValue(); force = in.readBoolean(); allowNoMatch = in.readBoolean(); + closeJob = in.readBoolean(); } public String getDatafeedId() { @@ -124,6 +128,15 @@ public Request setAllowNoMatch(boolean allowNoMatch) { return this; } + public boolean closeJob() { + return closeJob; + } + + public Request setCloseJob(boolean closeJob) { + this.closeJob = closeJob; + return this; + } + @Override public boolean match(Task task) { for (String id : resolvedStartedDatafeedIds) { @@ -148,11 +161,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeTimeValue(stopTimeout); out.writeBoolean(force); out.writeBoolean(allowNoMatch); + out.writeBoolean(closeJob); } @Override public int hashCode() { - return Objects.hash(datafeedId, stopTimeout, force, allowNoMatch); + return Objects.hash(datafeedId, stopTimeout, force, allowNoMatch, closeJob); } @Override @@ -162,6 +176,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TIMEOUT.getPreferredName(), stopTimeout.getStringRep()); builder.field(FORCE.getPreferredName(), force); builder.field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch); + builder.field(CLOSE_JOB.getPreferredName(), closeJob); builder.endObject(); return builder; } @@ -178,7 +193,8 @@ public boolean equals(Object obj) { return Objects.equals(datafeedId, other.datafeedId) && Objects.equals(stopTimeout, other.stopTimeout) && Objects.equals(force, other.force) - && Objects.equals(allowNoMatch, other.allowNoMatch); + && Objects.equals(allowNoMatch, other.allowNoMatch) + && Objects.equals(closeJob, other.closeJob); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index ec0d160a1ff4a..ddeedb4a396f7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -487,8 +487,8 @@ public void testRealtime() throws Exception { startRealtime(jobId); try { - StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId); - assertTrue(stopJobResponse.isStopped()); + StopDatafeedAction.Response stopDatafeedResponse = stopDatafeed(datafeedId); + assertTrue(stopDatafeedResponse.isStopped()); } catch (Exception e) { HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING); throw e; @@ -498,6 +498,88 @@ public void testRealtime() throws Exception { GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); }); + + // The job should _not_ have closed automatically + assertBusy(() -> { + GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.OPENED)); + }); + } + + private void doTestRealtime_GivenCloseJobParameter(String jobId, boolean closeJobParameter, JobState jobState) throws Exception { + String datafeedId = jobId + "-datafeed"; + startRealtime(jobId); + + try { + StopDatafeedAction.Response stopDatafeedResponse = stopDatafeed(datafeedId, true); + assertTrue(stopDatafeedResponse.isStopped()); + } catch (Exception e) { + HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING); + throw e; + } + + // The job should have closed automatically + assertBusy(() -> { + GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED)); + }); + + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + } + + public void testRealtime_GivenCloseJobParameterIsTrue() throws Exception { + doTestRealtime_GivenCloseJobParameter("realtime-job-close-job-true", true, JobState.CLOSED); + } + + public void testRealtime_GivenCloseJobParameterIsFalse() throws Exception { + doTestRealtime_GivenCloseJobParameter("realtime-job-close-job-false", false, JobState.OPENED); + } + + private void doTestStopLookback_GivenCloseJobParameter(String jobId, boolean closeJobParameter, JobState jobState) throws Exception { + String datafeedId = jobId + "-datafeed"; + + client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get(); + long numDocs = 20480; + long now = System.currentTimeMillis(); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs(logger, "data", numDocs, twoWeeksAgo, oneWeekAgo); + + Job.Builder job = createScheduledJob(jobId); + putJob(job); + openJob(job.getId()); + assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); + + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(datafeedId, jobId, Collections.singletonList("data")); + // Use lots of chunks to maximise the chance that we can stop the lookback before it completes + datafeedConfigBuilder.setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS))); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now); + assertBusy(() -> assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), greaterThan(0L)), 60, TimeUnit.SECONDS); + + // Stop the datafeed with the given close_job parameter + StopDatafeedAction.Response stopDatafeedResponse = stopDatafeed(datafeedId, closeJobParameter); + assertTrue(stopDatafeedResponse.isStopped()); + + // Check the job state is as expected + assertBusy(() -> assertEquals(jobState, getJobStats(jobId).get(0).getState()), 2, TimeUnit.SECONDS); + } + + public void testStopLookback_GivenCloseJobParameterIsTrue() throws Exception { + // Stop the datafeed with close_job=true + doTestStopLookback_GivenCloseJobParameter("lookback-stop-close-job-true", true, JobState.CLOSED); + } + + public void testStopLookback_GivenCloseJobParameterIsFalse() throws Exception { + // Stop the datafeed with close_job=false + doTestStopLookback_GivenCloseJobParameter("lookback-stop-close-job-false", false, JobState.OPENED); } public void testCloseJobStopsRealtimeDatafeed() throws Exception { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index e4fc3343537ac..1c1a67a64787c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -141,6 +141,12 @@ protected StopDatafeedAction.Response stopDatafeed(String datafeedId) { return client().execute(StopDatafeedAction.INSTANCE, request).actionGet(); } + protected StopDatafeedAction.Response stopDatafeed(String datafeedId, boolean closeJob) { + StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); + request.setCloseJob(closeJob); + return client().execute(StopDatafeedAction.INSTANCE, request).actionGet(); + } + protected PutDatafeedAction.Response updateDatafeed(DatafeedUpdate update) { UpdateDatafeedAction.Request request = new UpdateDatafeedAction.Request(update); return client().execute(UpdateDatafeedAction.INSTANCE, request).actionGet(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 34bdfc82ac971..8fe1672f51be7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; @@ -27,7 +28,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.Predicates; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.persistent.PersistentTasksClusterService; @@ -39,6 +42,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; @@ -48,6 +52,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; import java.util.ArrayList; import java.util.Collection; @@ -58,8 +63,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.ml.utils.ExceptionCollectionHandling.exceptionArrayToStatusException; public class TransportStopDatafeedAction extends TransportTasksAction< @@ -106,7 +113,7 @@ public TransportStopDatafeedAction( } /** - * Sort the datafeed IDs the their task state and add to one + * Sort the datafeed IDs by their task state and add to one * of the list arguments depending on the state. * * @param expandedDatafeedIds The expanded set of IDs @@ -211,7 +218,64 @@ private void doExecute( if (request.isForce()) { forceStopDatafeed(request, listener, tasks, nodes, notStoppedDatafeeds); } else { - normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds, attempt); + final List startedDatafeedsJobs = new ArrayList<>(); + for (String datafeedId : startedDatafeeds) { + PersistentTasksCustomMetadata.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); + if (datafeedTask != null + && PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) { + startedDatafeedsJobs.add(((StartDatafeedAction.DatafeedParams) datafeedTask.getParams()).getJobId()); + } + } + if (request.closeJob() && startedDatafeedsJobs.isEmpty() == false) { + // If the "close_job" parameter was set to "true" on the stop datafeed request we attempt to first close the + // jobs associated with the datafeeds. This will in turn attempt to stop the jobs' datafeeds (this time with the + // "close_job" flag set to false, to avoid recursion) + ActionListener>> closeJobActionListener = listener + .delegateFailureAndWrap((delegate, jobsResponses) -> { + List jobIds = jobsResponses.stream() + .filter(t -> t.v2().isAcknowledged() == false) + .map(Tuple::v1) + .collect(toList()); + if (jobIds.isEmpty()) { + logger.debug("Successfully closed jobs (and associated datafeeds)"); + } else { + logger.warn("Failed to close jobs (and associated datafeeds): {}", jobIds); + } + delegate.onResponse(new StopDatafeedAction.Response(true)); + }); + + TypedChainTaskExecutor> chainTaskExecutor = new TypedChainTaskExecutor<>( + EsExecutors.DIRECT_EXECUTOR_SERVICE, + Predicates.always(), + Predicates.always() + ); + for (String jobId : startedDatafeedsJobs) { + chainTaskExecutor.add( + al -> executeAsyncWithOrigin( + client, + ML_ORIGIN, + CloseJobAction.INSTANCE, + new CloseJobAction.Request(jobId), + listener.delegateFailureAndWrap( + (l, response) -> l.onResponse(new StopDatafeedAction.Response(response.isClosed())) + ) + ) + ); + } + chainTaskExecutor.execute(closeJobActionListener); + } else { + normalStopDatafeed( + task, + request, + listener, + tasks, + nodes, + startedDatafeeds, + stoppingDatafeeds, + startedDatafeedsJobs, + attempt + ); + } } }, listener::onFailure) ); @@ -226,10 +290,10 @@ private void normalStopDatafeed( DiscoveryNodes nodes, List startedDatafeeds, List stoppingDatafeeds, + List startedDatafeedsJobs, int attempt ) { final Set executorNodes = new HashSet<>(); - final List startedDatafeedsJobs = new ArrayList<>(); final List resolvedStartedDatafeeds = new ArrayList<>(); final List> allDataFeedsToWaitFor = new ArrayList<>(); for (String datafeedId : startedDatafeeds) { @@ -240,7 +304,6 @@ private void normalStopDatafeed( assert datafeedTask != null : msg; logger.error(msg); } else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) { - startedDatafeedsJobs.add(((StartDatafeedAction.DatafeedParams) datafeedTask.getParams()).getJobId()); resolvedStartedDatafeeds.add(datafeedId); executorNodes.add(datafeedTask.getExecutorNode()); allDataFeedsToWaitFor.add(datafeedTask); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 7829adb395675..210fa11d47f4b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -81,7 +81,7 @@ * This class implements CRUD operation for the * datafeed configuration document * - * The number of datafeeds returned in a search it limited to + * The number of datafeeds returned in a search is limited to * {@link MlConfigIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. * In most cases we expect 10s or 100s of datafeeds to be defined and * a search for all datafeeds should return all. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java index dcc213a571469..20c6a2dd5c0e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestStopDatafeedAction.java @@ -59,6 +59,9 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), request.isForce())); } request.setAllowNoMatch(restRequest.paramAsBoolean(Request.ALLOW_NO_MATCH.getPreferredName(), request.allowNoMatch())); + if (restRequest.hasParam(Request.CLOSE_JOB.getPreferredName())) { + request.setCloseJob(restRequest.paramAsBoolean(Request.CLOSE_JOB.getPreferredName(), request.closeJob())); + } } return channel -> client.execute(StopDatafeedAction.INSTANCE, request, new RestBuilderListener(channel) {