Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/138634.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138634
summary: Add "close_job" parameter to the stop datafeed API
area: Machine Learning
type: enhancement
issues:
- 138010
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
"default": false,
"description": "True if the datafeed should be forcefully stopped."
},
"close_job": {
"type": "boolean",
"default": false,
"description": "True if the job associated with the datafeed should be closed."
},
"timeout": {
"type": "time",
"default": "20s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static class Request extends BaseTasksRequest<Request> implements ToXCont
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField CLOSE_JOB = new ParseField("close_job");

public static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static {
Expand All @@ -52,6 +53,7 @@ public static class Request extends BaseTasksRequest<Request> implements ToXCont
);
PARSER.declareBoolean(Request::setForce, FORCE);
PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH);
PARSER.declareBoolean(Request::setCloseJob, CLOSE_JOB);
}

public static Request parseRequest(String datafeedId, XContentParser parser) {
Expand All @@ -67,6 +69,7 @@ public static Request parseRequest(String datafeedId, XContentParser parser) {
private TimeValue stopTimeout = DEFAULT_TIMEOUT;
private boolean force = false;
private boolean allowNoMatch = true;
private boolean closeJob = false;

public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
Expand All @@ -81,6 +84,7 @@ public Request(StreamInput in) throws IOException {
stopTimeout = in.readTimeValue();
force = in.readBoolean();
allowNoMatch = in.readBoolean();
closeJob = in.readBoolean();
}

public String getDatafeedId() {
Expand Down Expand Up @@ -124,6 +128,15 @@ public Request setAllowNoMatch(boolean allowNoMatch) {
return this;
}

public boolean closeJob() {
return closeJob;
}

public Request setCloseJob(boolean closeJob) {
this.closeJob = closeJob;
return this;
}

@Override
public boolean match(Task task) {
for (String id : resolvedStartedDatafeedIds) {
Expand All @@ -148,11 +161,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(stopTimeout);
out.writeBoolean(force);
out.writeBoolean(allowNoMatch);
out.writeBoolean(closeJob);
}

@Override
public int hashCode() {
return Objects.hash(datafeedId, stopTimeout, force, allowNoMatch);
return Objects.hash(datafeedId, stopTimeout, force, allowNoMatch, closeJob);
}

@Override
Expand All @@ -162,6 +176,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TIMEOUT.getPreferredName(), stopTimeout.getStringRep());
builder.field(FORCE.getPreferredName(), force);
builder.field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch);
builder.field(CLOSE_JOB.getPreferredName(), closeJob);
builder.endObject();
return builder;
}
Expand All @@ -178,7 +193,8 @@ public boolean equals(Object obj) {
return Objects.equals(datafeedId, other.datafeedId)
&& Objects.equals(stopTimeout, other.stopTimeout)
&& Objects.equals(force, other.force)
&& Objects.equals(allowNoMatch, other.allowNoMatch);
&& Objects.equals(allowNoMatch, other.allowNoMatch)
&& Objects.equals(closeJob, other.closeJob);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ public void testRealtime() throws Exception {
startRealtime(jobId);

try {
StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedId);
assertTrue(stopJobResponse.isStopped());
StopDatafeedAction.Response stopDatafeedResponse = stopDatafeed(datafeedId);
assertTrue(stopDatafeedResponse.isStopped());
} catch (Exception e) {
HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING);
throw e;
Expand All @@ -498,6 +498,88 @@ public void testRealtime() throws Exception {
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});

// The job should _not_ have closed automatically
assertBusy(() -> {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.OPENED));
});
}

private void doTestRealtime_GivenCloseJobParameter(String jobId, boolean closeJobParameter, JobState jobState) throws Exception {
String datafeedId = jobId + "-datafeed";
startRealtime(jobId);

try {
StopDatafeedAction.Response stopDatafeedResponse = stopDatafeed(datafeedId, true);
assertTrue(stopDatafeedResponse.isStopped());
} catch (Exception e) {
HotThreads.logLocalHotThreads(logger, Level.INFO, "hot threads at failure", ReferenceDocs.LOGGING);
throw e;
}

// The job should have closed automatically
assertBusy(() -> {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED));
});

assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}

public void testRealtime_GivenCloseJobParameterIsTrue() throws Exception {
doTestRealtime_GivenCloseJobParameter("realtime-job-close-job-true", true, JobState.CLOSED);
}

public void testRealtime_GivenCloseJobParameterIsFalse() throws Exception {
doTestRealtime_GivenCloseJobParameter("realtime-job-close-job-false", false, JobState.OPENED);
}

private void doTestStopLookback_GivenCloseJobParameter(String jobId, boolean closeJobParameter, JobState jobState) throws Exception {
String datafeedId = jobId + "-datafeed";

client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get();
long numDocs = 20480;
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data", numDocs, twoWeeksAgo, oneWeekAgo);

Job.Builder job = createScheduledJob(jobId);
putJob(job);
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));

DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(datafeedId, jobId, Collections.singletonList("data"));
// Use lots of chunks to maximise the chance that we can stop the lookback before it completes
datafeedConfigBuilder.setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS)));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), greaterThan(0L)), 60, TimeUnit.SECONDS);

// Stop the datafeed with the given close_job parameter
StopDatafeedAction.Response stopDatafeedResponse = stopDatafeed(datafeedId, closeJobParameter);
assertTrue(stopDatafeedResponse.isStopped());

// Check the job state is as expected
assertBusy(() -> assertEquals(jobState, getJobStats(jobId).get(0).getState()), 2, TimeUnit.SECONDS);
}

public void testStopLookback_GivenCloseJobParameterIsTrue() throws Exception {
// Stop the datafeed with close_job=true
doTestStopLookback_GivenCloseJobParameter("lookback-stop-close-job-true", true, JobState.CLOSED);
}

public void testStopLookback_GivenCloseJobParameterIsFalse() throws Exception {
// Stop the datafeed with close_job=false
doTestStopLookback_GivenCloseJobParameter("lookback-stop-close-job-false", false, JobState.OPENED);
}

public void testCloseJobStopsRealtimeDatafeed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ protected StopDatafeedAction.Response stopDatafeed(String datafeedId) {
return client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
}

protected StopDatafeedAction.Response stopDatafeed(String datafeedId, boolean closeJob) {
StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId);
request.setCloseJob(closeJob);
return client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
}

protected PutDatafeedAction.Response updateDatafeed(DatafeedUpdate update) {
UpdateDatafeedAction.Request request = new UpdateDatafeedAction.Request(update);
return client().execute(UpdateDatafeedAction.INSTANCE, request).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
Expand All @@ -27,7 +28,9 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.PersistentTasksClusterService;
Expand All @@ -39,6 +42,7 @@
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
Expand All @@ -48,6 +52,7 @@
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -58,8 +63,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.ml.utils.ExceptionCollectionHandling.exceptionArrayToStatusException;

public class TransportStopDatafeedAction extends TransportTasksAction<
Expand Down Expand Up @@ -106,7 +113,7 @@ public TransportStopDatafeedAction(
}

/**
* Sort the datafeed IDs the their task state and add to one
* Sort the datafeed IDs by their task state and add to one
* of the list arguments depending on the state.
*
* @param expandedDatafeedIds The expanded set of IDs
Expand Down Expand Up @@ -211,7 +218,64 @@ private void doExecute(
if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, nodes, notStoppedDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds, attempt);
final List<String> startedDatafeedsJobs = new ArrayList<>();
for (String datafeedId : startedDatafeeds) {
PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null
&& PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
startedDatafeedsJobs.add(((StartDatafeedAction.DatafeedParams) datafeedTask.getParams()).getJobId());
}
}
if (request.closeJob() && startedDatafeedsJobs.isEmpty() == false) {
// If the "close_job" parameter was set to "true" on the stop datafeed request we attempt to first close the
// jobs associated with the datafeeds. This will in turn attempt to stop the jobs' datafeeds (this time with the
// "close_job" flag set to false, to avoid recursion)
ActionListener<List<Tuple<String, AcknowledgedResponse>>> closeJobActionListener = listener
.delegateFailureAndWrap((delegate, jobsResponses) -> {
List<String> jobIds = jobsResponses.stream()
.filter(t -> t.v2().isAcknowledged() == false)
.map(Tuple::v1)
.collect(toList());
if (jobIds.isEmpty()) {
logger.debug("Successfully closed jobs (and associated datafeeds)");
} else {
logger.warn("Failed to close jobs (and associated datafeeds): {}", jobIds);
}
delegate.onResponse(new StopDatafeedAction.Response(true));
});

TypedChainTaskExecutor<Tuple<String, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Predicates.always(),
Predicates.always()
);
for (String jobId : startedDatafeedsJobs) {
chainTaskExecutor.add(
al -> executeAsyncWithOrigin(
client,
ML_ORIGIN,
CloseJobAction.INSTANCE,
new CloseJobAction.Request(jobId),
listener.delegateFailureAndWrap(
(l, response) -> l.onResponse(new StopDatafeedAction.Response(response.isClosed()))
)
)
);
}
chainTaskExecutor.execute(closeJobActionListener);
} else {
normalStopDatafeed(
task,
request,
listener,
tasks,
nodes,
startedDatafeeds,
stoppingDatafeeds,
startedDatafeedsJobs,
attempt
);
}
}
}, listener::onFailure)
);
Expand All @@ -226,10 +290,10 @@ private void normalStopDatafeed(
DiscoveryNodes nodes,
List<String> startedDatafeeds,
List<String> stoppingDatafeeds,
List<String> startedDatafeedsJobs,
int attempt
) {
final Set<String> executorNodes = new HashSet<>();
final List<String> startedDatafeedsJobs = new ArrayList<>();
final List<String> resolvedStartedDatafeeds = new ArrayList<>();
final List<PersistentTasksCustomMetadata.PersistentTask<?>> allDataFeedsToWaitFor = new ArrayList<>();
for (String datafeedId : startedDatafeeds) {
Expand All @@ -240,7 +304,6 @@ private void normalStopDatafeed(
assert datafeedTask != null : msg;
logger.error(msg);
} else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
startedDatafeedsJobs.add(((StartDatafeedAction.DatafeedParams) datafeedTask.getParams()).getJobId());
resolvedStartedDatafeeds.add(datafeedId);
executorNodes.add(datafeedTask.getExecutorNode());
allDataFeedsToWaitFor.add(datafeedTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* This class implements CRUD operation for the
* datafeed configuration document
*
* The number of datafeeds returned in a search it limited to
* The number of datafeeds returned in a search is limited to
* {@link MlConfigIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
request.setForce(restRequest.paramAsBoolean(Request.FORCE.getPreferredName(), request.isForce()));
}
request.setAllowNoMatch(restRequest.paramAsBoolean(Request.ALLOW_NO_MATCH.getPreferredName(), request.allowNoMatch()));
if (restRequest.hasParam(Request.CLOSE_JOB.getPreferredName())) {
request.setCloseJob(restRequest.paramAsBoolean(Request.CLOSE_JOB.getPreferredName(), request.closeJob()));
}
}
return channel -> client.execute(StopDatafeedAction.INSTANCE, request, new RestBuilderListener<Response>(channel) {

Expand Down