Skip to content

Commit a4405ef

Browse files
committed
[ML] Refactor job data removers to utilize WritableIndexExpander for managing writable indices
1 parent 1b1f97f commit a4405ef

12 files changed

+155
-77
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -255,29 +255,26 @@ private List<MlDataRemover> createDataRemovers(
255255
originClient,
256256
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
257257
parentTaskId,
258-
anomalyDetectionAuditor,
259-
threadPool,
260-
writableIndexExpander
258+
writableIndexExpander, anomalyDetectionAuditor,
259+
threadPool
261260
),
262-
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId),
261+
new ExpiredForecastsRemover(originClient, threadPool, parentTaskId, writableIndexExpander),
263262
new ExpiredModelSnapshotsRemover(
264263
originClient,
265264
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
265+
parentTaskId, writableIndexExpander,
266266
threadPool,
267-
parentTaskId,
268267
jobResultsProvider,
269-
anomalyDetectionAuditor
270-
),
268+
anomalyDetectionAuditor),
271269
new UnusedStateRemover(originClient, parentTaskId, writableIndexExpander),
272270
new EmptyStateIndexRemover(originClient, parentTaskId),
273271
new UnusedStatsRemover(originClient, parentTaskId, writableIndexExpander),
274272
new ExpiredAnnotationsRemover(
275273
originClient,
276274
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
277-
parentTaskId,
275+
parentTaskId, writableIndexExpander,
278276
anomalyDetectionAuditor,
279-
threadPool
280-
)
277+
threadPool)
281278
);
282279
}
283280

@@ -287,23 +284,21 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
287284
client,
288285
new VolatileCursorIterator<>(jobs),
289286
parentTaskId,
290-
anomalyDetectionAuditor,
291-
threadPool,
292-
writableIndexExpander
287+
writableIndexExpander, anomalyDetectionAuditor,
288+
threadPool
293289
),
294-
new ExpiredForecastsRemover(client, threadPool, parentTaskId),
290+
new ExpiredForecastsRemover(client, threadPool, parentTaskId, writableIndexExpander),
295291
new ExpiredModelSnapshotsRemover(
296292
client,
297293
new VolatileCursorIterator<>(jobs),
294+
parentTaskId, writableIndexExpander,
298295
threadPool,
299-
parentTaskId,
300296
jobResultsProvider,
301-
anomalyDetectionAuditor
302-
),
297+
anomalyDetectionAuditor),
303298
new UnusedStateRemover(client, parentTaskId, writableIndexExpander),
304299
new EmptyStateIndexRemover(client, parentTaskId),
305300
new UnusedStatsRemover(client, parentTaskId, writableIndexExpander),
306-
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, anomalyDetectionAuditor, threadPool)
301+
new ExpiredAnnotationsRemover(client, new VolatileCursorIterator<>(jobs), parentTaskId, writableIndexExpander, anomalyDetectionAuditor, threadPool)
307302
);
308303
}
309304

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.Job;
1313

1414
import java.util.Iterator;
15+
import java.util.Objects;
1516
import java.util.function.BooleanSupplier;
1617

1718
/**
@@ -26,12 +27,18 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
2627
protected final OriginSettingClient client;
2728
private final Iterator<Job> jobIterator;
2829
private final TaskId parentTaskId;
30+
protected final WritableIndexExpander writableIndexExpander;
2931

30-
AbstractExpiredJobDataRemover(OriginSettingClient client, Iterator<Job> jobIterator, TaskId parentTaskId) {
32+
AbstractExpiredJobDataRemover(
33+
OriginSettingClient client,
34+
Iterator<Job> jobIterator,
35+
TaskId parentTaskId,
36+
WritableIndexExpander writableIndexExpander
37+
) {
3138
this.client = client;
3239
this.jobIterator = jobIterator;
3340
this.parentTaskId = parentTaskId;
34-
41+
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
3542
}
3643

3744
protected TaskId getParentTaskId() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.time.ZoneOffset;
3535
import java.time.ZonedDateTime;
3636
import java.time.format.DateTimeFormatter;
37+
import java.util.ArrayList;
3738
import java.util.Iterator;
3839
import java.util.Objects;
3940
import java.util.concurrent.TimeUnit;
@@ -59,10 +60,10 @@ public ExpiredAnnotationsRemover(
5960
OriginSettingClient client,
6061
Iterator<Job> jobIterator,
6162
TaskId parentTaskId,
62-
AnomalyDetectionAuditor auditor,
63+
WritableIndexExpander writableIndexExpander, AnomalyDetectionAuditor auditor,
6364
ThreadPool threadPool
64-
) {
65-
super(client, jobIterator, parentTaskId);
65+
) {
66+
super(client, jobIterator, parentTaskId, writableIndexExpander);
6667
this.auditor = Objects.requireNonNull(auditor);
6768
this.threadPool = Objects.requireNonNull(threadPool);
6869
}
@@ -83,7 +84,14 @@ protected void removeDataBefore(
8384
long cutoffEpochMs,
8485
ActionListener<Boolean> listener
8586
) {
86-
DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs);
87+
var indicesToQuery = writableIndexExpander.getWritableIndices(AnnotationIndex.READ_ALIAS_NAME);
88+
if (indicesToQuery.isEmpty()) {
89+
LOGGER.info("No writable annotation indices found for [{}] job. No expired annotations to remove.", job.getId());
90+
listener.onResponse(true);
91+
return;
92+
}
93+
94+
DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs, indicesToQuery);
8795
request.setParentTask(getParentTaskId());
8896

8997
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@@ -112,12 +120,12 @@ public void onFailure(Exception e) {
112120
});
113121
}
114122

115-
private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
123+
private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs, ArrayList<String> indicesToQuery) {
116124
QueryBuilder query = QueryBuilders.boolQuery()
117125
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
118126
.filter(QueryBuilders.rangeQuery(Annotation.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"))
119127
.filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), InternalUsers.XPACK_USER.principal()));
120-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setSlices(
128+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setSlices(
121129
AbstractBulkByScrollRequest.AUTO_SLICES
122130
)
123131
.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,14 @@ public class ExpiredForecastsRemover implements MlDataRemover {
6464
private final ThreadPool threadPool;
6565
private final long cutoffEpochMs;
6666
private final TaskId parentTaskId;
67+
private final WritableIndexExpander writableIndexExpander;
6768

68-
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId) {
69+
public ExpiredForecastsRemover(OriginSettingClient client, ThreadPool threadPool, TaskId parentTaskId, WritableIndexExpander writableIndexExpander) {
6970
this.client = Objects.requireNonNull(client);
7071
this.threadPool = Objects.requireNonNull(threadPool);
7172
this.cutoffEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
7273
this.parentTaskId = parentTaskId;
74+
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
7375
}
7476

7577
@Override
@@ -125,7 +127,14 @@ private void deleteForecasts(
125127
return;
126128
}
127129

128-
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete).setRequestsPerSecond(requestsPerSec)
130+
var indicesToQuery = writableIndexExpander.getWritableIndices(RESULTS_INDEX_PATTERN);
131+
if (indicesToQuery.isEmpty()) {
132+
LOGGER.info("No writable indices found for expired forecasts. No expired forecasts to remove.");
133+
listener.onResponse(true);
134+
return;
135+
}
136+
137+
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete, indicesToQuery).setRequestsPerSecond(requestsPerSec)
129138
.setAbortOnVersionConflict(false);
130139
request.setParentTask(parentTaskId);
131140
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@@ -199,12 +208,12 @@ private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse)
199208
return forecastsToDelete;
200209
}
201210

202-
private static DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
211+
private static DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids, ArrayList<String> indicesToQuery) {
203212
DeleteByQueryRequest request = new DeleteByQueryRequest();
204213
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
205214
request.setTimeout(DEFAULT_MAX_DURATION);
206215

207-
request.indices(RESULTS_INDEX_PATTERN);
216+
request.indices(indicesToQuery.toArray(new String[0]));
208217
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
209218
boolQuery.must(
210219
QueryBuilders.termsQuery(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,16 @@
1111
import org.elasticsearch.ElasticsearchStatusException;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.search.SearchRequest;
14+
import org.elasticsearch.action.support.IndicesOptions;
1415
import org.elasticsearch.action.support.ThreadedActionListener;
1516
import org.elasticsearch.client.internal.OriginSettingClient;
1617
import org.elasticsearch.core.TimeValue;
1718
import org.elasticsearch.index.query.QueryBuilder;
1819
import org.elasticsearch.index.query.QueryBuilders;
20+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
21+
import org.elasticsearch.index.reindex.BulkByScrollTask;
22+
import org.elasticsearch.index.reindex.DeleteByQueryAction;
23+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
1924
import org.elasticsearch.rest.RestStatus;
2025
import org.elasticsearch.search.SearchHit;
2126
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -24,11 +29,14 @@
2429
import org.elasticsearch.search.sort.SortOrder;
2530
import org.elasticsearch.tasks.TaskId;
2631
import org.elasticsearch.threadpool.ThreadPool;
32+
import org.elasticsearch.xpack.core.ClientHelper;
2733
import org.elasticsearch.xpack.core.action.util.QueryPage;
2834
import org.elasticsearch.xpack.core.common.time.TimeUtils;
35+
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
2936
import org.elasticsearch.xpack.core.ml.job.config.Job;
3037
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3138
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
39+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3240
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
3341
import org.elasticsearch.xpack.ml.MachineLearning;
3442
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
@@ -37,13 +45,19 @@
3745
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
3846

3947
import java.util.ArrayList;
48+
import java.util.Collections;
49+
import java.util.HashSet;
4050
import java.util.Iterator;
4151
import java.util.List;
4252
import java.util.Objects;
53+
import java.util.Set;
4354
import java.util.concurrent.TimeUnit;
4455

4556
import static java.util.stream.Collectors.toList;
4657
import static org.elasticsearch.core.Strings.format;
58+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
59+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
60+
4761

4862
/**
4963
* Deletes all model snapshots that have expired the configured retention time
@@ -62,9 +76,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
6276
private static final long MS_IN_ONE_DAY = TimeValue.timeValueDays(1).getMillis();
6377

6478
/**
65-
* The max number of snapshots to fetch per job. It is set to 10K, the default for an index as
66-
* we don't change that in our ML indices. It should be more than enough for most cases. If not,
67-
* it will take a few iterations to delete all snapshots, which is OK.
79+
* The max number of snapshots to fetch per job. It is set to 10K, the default for an index as
80+
* we don't change that in our ML indices. It should be more than enough for most cases. If not,
81+
* it will take a few iterations to delete all snapshots, which is OK.
6882
*/
6983
private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;
7084

@@ -75,12 +89,11 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
7589
public ExpiredModelSnapshotsRemover(
7690
OriginSettingClient client,
7791
Iterator<Job> jobIterator,
78-
ThreadPool threadPool,
79-
TaskId parentTaskId,
92+
TaskId parentTaskId, WritableIndexExpander writableIndexExpander, ThreadPool threadPool,
8093
JobResultsProvider jobResultsProvider,
8194
AnomalyDetectionAuditor auditor
8295
) {
83-
super(client, jobIterator, parentTaskId);
96+
super(client, jobIterator, parentTaskId, writableIndexExpander);
8497
this.threadPool = Objects.requireNonNull(threadPool);
8598
this.jobResultsProvider = jobResultsProvider;
8699
this.auditor = auditor;
@@ -247,8 +260,31 @@ private void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, String job
247260
listener.onResponse(true);
248261
return;
249262
}
250-
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
251-
deleter.deleteModelSnapshots(modelSnapshots, listener.delegateFailureAndWrap((l, bulkResponse) -> {
263+
264+
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexPattern();
265+
266+
List<String> idsToDelete = new ArrayList<>();
267+
Set<String> indices = new HashSet<>();
268+
indices.add(stateIndexName);
269+
indices.add(AnnotationIndex.READ_ALIAS_NAME);
270+
for (ModelSnapshot modelSnapshot : modelSnapshots) {
271+
idsToDelete.addAll(modelSnapshot.stateDocumentIds());
272+
idsToDelete.add(ModelSnapshot.documentId(modelSnapshot));
273+
idsToDelete.add(ModelSnapshot.annotationDocumentId(modelSnapshot));
274+
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
275+
}
276+
277+
// Remove read-only indices
278+
var indicesToQuery = writableIndexExpander.getWritableIndices(indices);
279+
280+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setRefresh(true)
281+
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
282+
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
283+
284+
// _doc is the most efficient sort order and will also disable scoring
285+
deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
286+
287+
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener.delegateFailureAndWrap((l, bulkResponse) -> {
252288
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOTS_DELETED, modelSnapshots.size()));
253289
LOGGER.debug(
254290
() -> format(
@@ -259,7 +295,7 @@ private void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, String job
259295
)
260296
);
261297
l.onResponse(true);
262-
}));
298+
}
299+
));
263300
}
264-
265301
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.time.ZoneOffset;
5353
import java.time.ZonedDateTime;
5454
import java.time.format.DateTimeFormatter;
55+
import java.util.ArrayList;
5556
import java.util.Iterator;
5657
import java.util.Objects;
5758
import java.util.concurrent.TimeUnit;
@@ -70,20 +71,18 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
7071

7172
private final AnomalyDetectionAuditor auditor;
7273
private final ThreadPool threadPool;
73-
private final WritableIndexExpander writableIndexExpander;
74+
7475

7576
public ExpiredResultsRemover(
7677
OriginSettingClient client,
7778
Iterator<Job> jobIterator,
7879
TaskId parentTaskId,
79-
AnomalyDetectionAuditor auditor,
80-
ThreadPool threadPool,
81-
WritableIndexExpander writableIndexExpander
80+
WritableIndexExpander writableIndexExpander, AnomalyDetectionAuditor auditor,
81+
ThreadPool threadPool
8282
) {
83-
super(client, jobIterator, parentTaskId);
83+
super(client, jobIterator, parentTaskId, writableIndexExpander);
8484
this.auditor = Objects.requireNonNull(auditor);
8585
this.threadPool = Objects.requireNonNull(threadPool);
86-
this.writableIndexExpander = Objects.requireNonNull(writableIndexExpander);
8786
}
8887

8988
@Override
@@ -100,7 +99,15 @@ protected void removeDataBefore(
10099
ActionListener<Boolean> listener
101100
) {
102101
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
103-
DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs);
102+
103+
var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
104+
if (indicesToQuery.isEmpty()) {
105+
LOGGER.info("No writable indices found for job [{}]. No expired results removed.", job.getId());
106+
listener.onResponse(true);
107+
return;
108+
}
109+
110+
DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs, indicesToQuery);
104111
request.setParentTask(getParentTaskId());
105112

106113
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@@ -139,7 +146,7 @@ public void onFailure(Exception e) {
139146
});
140147
}
141148

142-
private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
149+
private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs, ArrayList<String> indicesToQuery) {
143150
QueryBuilder excludeFilter = QueryBuilders.termsQuery(
144151
Result.RESULT_TYPE.getPreferredName(),
145152
ModelSizeStats.RESULT_TYPE_VALUE,
@@ -152,13 +159,6 @@ private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, lon
152159
.filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
153160
.mustNot(excludeFilter);
154161

155-
var indicesToQuery = writableIndexExpander.getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
156-
157-
if (indicesToQuery.isEmpty()) {
158-
LOGGER.warn("No writable indices found for job [{}]", job.getId());
159-
return new DeleteByQueryRequest();
160-
}
161-
162162
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setSlices(
163163
AbstractBulkByScrollRequest.AUTO_SLICES
164164
)

0 commit comments

Comments
 (0)