diff --git a/docs/changelog/125408.yaml b/docs/changelog/125408.yaml new file mode 100644 index 0000000000000..3333bbc3ef9f4 --- /dev/null +++ b/docs/changelog/125408.yaml @@ -0,0 +1,6 @@ +pr: 125408 +summary: Prevent ML data retention logic from failing when deleting documents in read-only + indices +area: Machine Learning +type: bug +issues: [] diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java index 6e9ce3462feeb..7f01cfc4cd14c 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.tasks.TaskId; @@ -63,91 +64,118 @@ public void createComponents() { } public void testRemoveUnusedStats() throws Exception { + String modelId = "model-with-stats"; + putDFA(modelId); - prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get(); - - PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request( - new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats") - .setModelMemoryLimit(ByteSizeValue.ofGb(1)) - .setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null)) - .setDest(new DataFrameAnalyticsDest("bar", null)) - .setAnalysis(new Regression("prediction")) - .build() - ); - client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet(); - - client.execute( - PutTrainedModelAction.INSTANCE, - new PutTrainedModelAction.Request( - TrainedModelConfig.builder() - .setModelId("model-with-stats") - .setInferenceConfig(RegressionConfig.EMPTY_PARAMS) - .setInput(new TrainedModelInput(Arrays.asList("foo", "bar"))) - .setParsedDefinition( - new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList()) - .setTrainedModel( - Tree.builder() - .setFeatureNames(Arrays.asList("foo", "bar")) - .setRoot(TreeNode.builder(0).setLeafValue(42)) - .build() - ) - ) - .validate(true) - .build(), - false - ) - ).actionGet(); - + // Existing analytics and models indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), DataCounts.documentId("analytics-with-stats")); - indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats")); + indexStatDocument(new InferenceStats(1, 1, 1, 1, modelId, "test", Instant.now()), InferenceStats.docId(modelId, "test")); indexStatDocument( new InferenceStats(1, 1, 1, 1, TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test", Instant.now()), InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test") ); + + // Unused analytics/model stats + indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats")); indexStatDocument( new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()), InferenceStats.docId("missing-model", "test") ); + + refreshStatsIndex(); + runUnusedStatsRemover(); + + final String index = MlStatsIndex.TEMPLATE_NAME + "-000001"; + + // Validate expected docs + assertDocExists(index, InferenceStats.docId(modelId, "test")); + assertDocExists(index, DataCounts.documentId("analytics-with-stats")); + assertDocExists(index, InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")); + + // Validate removed docs + assertDocDoesNotExist(index, InferenceStats.docId("missing-model", "test")); + assertDocDoesNotExist(index, DataCounts.documentId("missing-analytics-with-stats")); + } + + public void testRemovingUnusedStatsFromReadOnlyIndexShouldFailSilently() throws Exception { + String modelId = "model-with-stats"; + putDFA(modelId); + indexStatDocument( - new InferenceStats(1, 1, 1, 1, "model-with-stats", "test", Instant.now()), - InferenceStats.docId("model-with-stats", "test") + new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()), + InferenceStats.docId("missing-model", "test") ); - client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); + makeIndexReadOnly(); + refreshStatsIndex(); - PlainActionFuture deletionListener = new PlainActionFuture<>(); - UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L)); - statsRemover.remove(10000.0f, deletionListener, () -> false); - deletionListener.actionGet(); + runUnusedStatsRemover(); + refreshStatsIndex(); - client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); + final String index = MlStatsIndex.TEMPLATE_NAME + "-000001"; + assertDocExists(index, InferenceStats.docId("missing-model", "test")); // should still exist + } - final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001"; + private void putDFA(String modelId) { + prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get(); - // Make sure that stats that should exist still exist - assertTrue(client().prepareGet(initialStateIndex, InferenceStats.docId("model-with-stats", "test")).get().isExists()); - assertTrue( - client().prepareGet( - initialStateIndex, - InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test") - ).get().isExists() + PutDataFrameAnalyticsAction.Request analyticsRequest = new PutDataFrameAnalyticsAction.Request( + new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats") + .setModelMemoryLimit(ByteSizeValue.ofGb(1)) + .setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null)) + .setDest(new DataFrameAnalyticsDest("bar", null)) + .setAnalysis(new Regression("prediction")) + .build() ); - assertTrue(client().prepareGet(initialStateIndex, DataCounts.documentId("analytics-with-stats")).get().isExists()); - - // make sure that unused stats were deleted - assertFalse(client().prepareGet(initialStateIndex, DataCounts.documentId("missing-analytics-with-stats")).get().isExists()); - assertFalse(client().prepareGet(initialStateIndex, InferenceStats.docId("missing-model", "test")).get().isExists()); + client.execute(PutDataFrameAnalyticsAction.INSTANCE, analyticsRequest).actionGet(); + + TrainedModelDefinition.Builder definition = new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList()) + .setTrainedModel( + Tree.builder().setFeatureNames(Arrays.asList("foo", "bar")).setRoot(TreeNode.builder(0).setLeafValue(42)).build() + ); + + TrainedModelConfig modelConfig = TrainedModelConfig.builder() + .setModelId(modelId) + .setInferenceConfig(RegressionConfig.EMPTY_PARAMS) + .setInput(new TrainedModelInput(Arrays.asList("foo", "bar"))) + .setParsedDefinition(definition) + .validate(true) + .build(); + + client.execute(PutTrainedModelAction.INSTANCE, new PutTrainedModelAction.Request(modelConfig, false)).actionGet(); } private void indexStatDocument(ToXContentObject object, String docId) throws Exception { - ToXContent.Params params = new ToXContent.MapParams( - Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, Boolean.toString(true)) - ); - IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()); - doc.id(docId); + IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()).id(docId); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - object.toXContent(builder, params); + object.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"))); doc.source(builder); client.index(doc).actionGet(); } } + + private void refreshStatsIndex() { + client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get(); + } + + private void runUnusedStatsRemover() { + PlainActionFuture deletionListener = new PlainActionFuture<>(); + new UnusedStatsRemover(client, new TaskId("test", 0L)).remove(10000.0f, deletionListener, () -> false); + deletionListener.actionGet(); + } + + private void makeIndexReadOnly() { + client().admin() + .indices() + .prepareUpdateSettings(MlStatsIndex.indexPattern()) + .setSettings(Settings.builder().put("index.blocks.write", true)) + .get(); + } + + private void assertDocExists(String index, String docId) { + assertTrue(client().prepareGet(index, docId).get().isExists()); + } + + private void assertDocDoesNotExist(String index, String docId) { + assertFalse(client().prepareGet(index, docId).get().isExists()); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 46e3da489453f..52cb1b412ad05 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -364,6 +364,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; +import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander; import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTaskExecutor; import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; @@ -922,6 +923,9 @@ public Collection createComponents(PluginServices services) { IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver(); TelemetryProvider telemetryProvider = services.telemetryProvider(); + // Initialize WritableIndexExpander + WritableIndexExpander.initialize(clusterService, indexNameExpressionResolver); + if (enabled == false) { // Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension, // both empty if ML is disabled diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index e49901ea9976b..3c178685d59a9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -240,6 +240,7 @@ private List createDataRemovers( TaskId parentTaskId, AnomalyDetectionAuditor anomalyDetectionAuditor ) { + return Arrays.asList( new ExpiredResultsRemover( originClient, @@ -252,8 +253,8 @@ private List createDataRemovers( new ExpiredModelSnapshotsRemover( originClient, new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)), - threadPool, parentTaskId, + threadPool, jobResultsProvider, anomalyDetectionAuditor ), @@ -277,8 +278,8 @@ private List createDataRemovers(List jobs, TaskId parentTask new ExpiredModelSnapshotsRemover( client, new VolatileCursorIterator<>(jobs), - threadPool, parentTaskId, + threadPool, jobResultsProvider, anomalyDetectionAuditor ), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index b9cc1902b7ab6..af57729cdae94 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -65,6 +65,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.security.user.InternalUsers; +import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; @@ -131,7 +132,10 @@ public void deleteModelSnapshots(List modelSnapshots, ActionListe indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); } - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])).setRefresh(true) + String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null); + if (indicesToQuery.length == 0) return; + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0]))); @@ -179,7 +183,16 @@ public void deleteAnnotations( boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete)); } QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery); - DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setQuery(query) + + String[] indicesToQuery = removeReadOnlyIndices( + List.of(AnnotationIndex.READ_ALIAS_NAME), + listener, + "annotations", + () -> listener.onResponse(true) + ); + if (indicesToQuery.length == 0) return; + + DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setAbortOnVersionConflict(false) .setRefresh(true) @@ -197,6 +210,28 @@ public void deleteAnnotations( ); } + private String[] removeReadOnlyIndices( + List indicesToQuery, + ActionListener listener, + String entityType, + Runnable onEmpty + ) { + try { + indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery); + } catch (Exception e) { + logger.error("Failed to get writable indices for [" + jobId + "]", e); + listener.onFailure(e); + return new String[0]; + } + if (indicesToQuery.isEmpty()) { + logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType); + if (onEmpty != null) { + onEmpty.run(); + } + } + return indicesToQuery.toArray(String[]::new); + } + /** * Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}. * Forecasts are not deleted, as they will not be automatically regenerated after @@ -218,7 +253,14 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener li ) ) .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); - DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setQuery(query) + String[] indicesToQuery = removeReadOnlyIndices( + List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)), + listener, + "results", + () -> listener.onResponse(true) + ); + if (indicesToQuery.length == 0) return; + DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setAbortOnVersionConflict(false) .setRefresh(true) @@ -263,9 +305,14 @@ public void deleteInterimResults() { * @param listener Response listener */ public void deleteDatafeedTimingStats(ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setRefresh( - true - ) + String[] indicesToQuery = removeReadOnlyIndices( + List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)), + listener, + "datafeed timing stats", + null + ); + if (indicesToQuery.length == 0) return; + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId))); @@ -455,7 +502,9 @@ private void deleteResultsByQuery( ActionListener refreshListener = ActionListener.wrap(refreshResponse -> { logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices)); ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - DeleteByQueryRequest request = new DeleteByQueryRequest(indices).setQuery(query) + String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null); + if (indicesToQuery.length == 0) return; + DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query) .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())) .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) .setAbortOnVersionConflict(false) @@ -523,7 +572,16 @@ private static IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesRespons private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener finishedHandler) { // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId)); - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query) + + String[] indicesToQuery = removeReadOnlyIndices( + List.of(AnomalyDetectorsIndex.jobStateIndexPattern()), + finishedHandler, + "quantiles", + () -> finishedHandler.onResponse(true) + ); + if (indicesToQuery.length == 0) return; + + DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query) .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())) .setAbortOnVersionConflict(false) .setRefresh(true); @@ -553,7 +611,14 @@ private void deleteCategorizerState( ) { // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum)); - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query) + String[] indicesToQuery = removeReadOnlyIndices( + List.of(AnomalyDetectorsIndex.jobStateIndexPattern()), + finishedHandler, + "categorizer state", + () -> finishedHandler.onResponse(true) + ); + if (indicesToQuery.length == 0) return; + DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query) .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())) .setAbortOnVersionConflict(false) .setRefresh(true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java index 050d01198b910..f181eaec7af87 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemover.java @@ -34,6 +34,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Iterator; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -83,7 +84,14 @@ protected void removeDataBefore( long cutoffEpochMs, ActionListener listener ) { - DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs); + var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(AnnotationIndex.READ_ALIAS_NAME); + if (indicesToQuery.isEmpty()) { + LOGGER.info("No writable annotation indices found for [{}] job. No expired annotations to remove.", job.getId()); + listener.onResponse(true); + return; + } + + DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs, indicesToQuery); request.setParentTask(getParentTaskId()); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @@ -112,12 +120,17 @@ public void onFailure(Exception e) { }); } - private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) { + private static DeleteByQueryRequest createDBQRequest( + Job job, + float requestsPerSec, + long cutoffEpochMs, + ArrayList indicesToQuery + ) { QueryBuilder query = QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) .filter(QueryBuilders.rangeQuery(Annotation.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")) .filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), InternalUsers.XPACK_USER.principal())); - DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setSlices( + DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setSlices( AbstractBulkByScrollRequest.AUTO_SLICES ) .setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 886c19a65a4d0..12dbcc5a14f12 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -125,7 +125,14 @@ private void deleteForecasts( return; } - DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete).setRequestsPerSecond(requestsPerSec) + var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(RESULTS_INDEX_PATTERN); + if (indicesToQuery.isEmpty()) { + LOGGER.info("No writable indices found for expired forecasts. No expired forecasts to remove."); + listener.onResponse(true); + return; + } + + DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete, indicesToQuery).setRequestsPerSecond(requestsPerSec) .setAbortOnVersionConflict(false); request.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @@ -199,12 +206,12 @@ private List findForecastsToDelete(SearchResponse searchResponse) return forecastsToDelete; } - private static DeleteByQueryRequest buildDeleteByQuery(List ids) { + private static DeleteByQueryRequest buildDeleteByQuery(List ids, ArrayList indicesToQuery) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); request.setTimeout(DEFAULT_MAX_DURATION); - request.indices(RESULTS_INDEX_PATTERN); + request.indices(indicesToQuery.toArray(new String[0])); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must( QueryBuilders.termsQuery( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index cbd505c293c86..af5d14b529cad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -11,11 +11,14 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -26,24 +29,29 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.common.time.TimeUtils; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import static java.util.stream.Collectors.toList; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Deletes all model snapshots that have expired the configured retention time @@ -62,9 +70,9 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private static final long MS_IN_ONE_DAY = TimeValue.timeValueDays(1).getMillis(); /** - * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as - * we don't change that in our ML indices. It should be more than enough for most cases. If not, - * it will take a few iterations to delete all snapshots, which is OK. + * The max number of snapshots to fetch per job. It is set to 10K, the default for an index as + * we don't change that in our ML indices. It should be more than enough for most cases. If not, + * it will take a few iterations to delete all snapshots, which is OK. */ private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; @@ -75,8 +83,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover public ExpiredModelSnapshotsRemover( OriginSettingClient client, Iterator jobIterator, - ThreadPool threadPool, TaskId parentTaskId, + ThreadPool threadPool, JobResultsProvider jobResultsProvider, AnomalyDetectionAuditor auditor ) { @@ -247,19 +255,59 @@ private void deleteModelSnapshots(List modelSnapshots, String job listener.onResponse(true); return; } - JobDataDeleter deleter = new JobDataDeleter(client, jobId); - deleter.deleteModelSnapshots(modelSnapshots, listener.delegateFailureAndWrap((l, bulkResponse) -> { - auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOTS_DELETED, modelSnapshots.size())); - LOGGER.debug( - () -> format( - "[%s] deleted model snapshots %s with descriptions %s", - jobId, - modelSnapshots.stream().map(ModelSnapshot::getSnapshotId).collect(toList()), - modelSnapshots.stream().map(ModelSnapshot::getDescription).collect(toList()) - ) - ); - l.onResponse(true); - })); - } + String stateIndexName = AnomalyDetectorsIndex.jobStateIndexPattern(); + + List idsToDelete = new ArrayList<>(); + Set indices = new HashSet<>(); + indices.add(stateIndexName); + indices.add(AnnotationIndex.READ_ALIAS_NAME); + for (ModelSnapshot modelSnapshot : modelSnapshots) { + idsToDelete.addAll(modelSnapshot.stateDocumentIds()); + idsToDelete.add(ModelSnapshot.documentId(modelSnapshot)); + idsToDelete.add(ModelSnapshot.annotationDocumentId(modelSnapshot)); + indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())); + } + + // Remove read-only indices + List indicesToQuery; + try { + indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indices); + } catch (Exception e) { + LOGGER.error("Failed to get writable indices for [" + jobId + "]", e); + listener.onFailure(e); + return; + } + if (indicesToQuery.isEmpty()) { + LOGGER.info("No writable model snapshot indices found for [{}] job. No expired model snapshots to remove.", jobId); + listener.onResponse(true); + return; + } + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setRefresh(true) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0]))); + + // _doc is the most efficient sort order and will also disable scoring + deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC); + + executeAsyncWithOrigin( + client, + ML_ORIGIN, + DeleteByQueryAction.INSTANCE, + deleteByQueryRequest, + listener.delegateFailureAndWrap((l, bulkResponse) -> { + auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOTS_DELETED, modelSnapshots.size())); + LOGGER.debug( + () -> format( + "[%s] deleted model snapshots %s with descriptions %s", + jobId, + modelSnapshots.stream().map(ModelSnapshot::getSnapshotId).collect(toList()), + modelSnapshots.stream().map(ModelSnapshot::getDescription).collect(toList()) + ) + ); + l.onResponse(true); + }) + ); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index be0bb53d454fe..64daa89a893dc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -52,6 +52,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Iterator; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -97,7 +98,16 @@ protected void removeDataBefore( ActionListener listener ) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); - DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs); + + var indicesToQuery = WritableIndexExpander.getInstance() + .getWritableIndices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); + if (indicesToQuery.isEmpty()) { + LOGGER.info("No writable indices found for job [{}]. No expired results removed.", job.getId()); + listener.onResponse(true); + return; + } + + DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs, indicesToQuery); request.setParentTask(getParentTaskId()); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @@ -136,7 +146,7 @@ public void onFailure(Exception e) { }); } - private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) { + private DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs, ArrayList indicesToQuery) { QueryBuilder excludeFilter = QueryBuilders.termsQuery( Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE, @@ -148,7 +158,8 @@ private static DeleteByQueryRequest createDBQRequest(Job job, float requestsPerS .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")) .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) .mustNot(excludeFilter); - DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())).setSlices( + + DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setSlices( AbstractBulkByScrollRequest.AUTO_SLICES ) .setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index c28c2f1d45bc3..e01b3ac9cd870 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -137,8 +137,18 @@ private Set getDataFrameAnalyticsJobIds() { private void executeDeleteUnusedStateDocs(List unusedDocIds, float requestsPerSec, ActionListener listener) { LOGGER.info("Found [{}] unused state documents; attempting to delete", unusedDocIds.size()); - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + + var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(AnomalyDetectorsIndex.jobStateIndexPattern()); + + if (indicesToQuery.isEmpty()) { + LOGGER.info("No writable indices found for unused state documents"); + listener.onResponse(true); + return; + } + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setIndicesOptions( + IndicesOptions.lenientExpandOpen() + ) .setAbortOnVersionConflict(false) .setRequestsPerSecond(requestsPerSec) .setTimeout(DEFAULT_MAX_DURATION) @@ -149,7 +159,7 @@ private void executeDeleteUnusedStateDocs(List unusedDocIds, float reque deleteByQueryRequest.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(response -> { - if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) { + if (response.getBulkFailures().isEmpty() == false || response.getSearchFailures().isEmpty() == false) { LOGGER.error( "Some unused state documents could not be deleted due to failures: {}", Strings.collectionToCommaDelimitedString(response.getBulkFailures()) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java index 1b7f84e1e11aa..65c62113bfddf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java @@ -102,13 +102,21 @@ private Set getTrainedModelIds() { } private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(MlStatsIndex.indexPattern()).setIndicesOptions( + var indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(MlStatsIndex.indexPattern()); + + if (indicesToQuery.isEmpty()) { + LOGGER.info("No writable indices found for unused stats documents"); + listener.onResponse(true); + return; + } + + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setIndicesOptions( IndicesOptions.lenientExpandOpen() ).setAbortOnVersionConflict(false).setRequestsPerSecond(requestsPerSec).setTimeout(DEFAULT_MAX_DURATION).setQuery(dbq); deleteByQueryRequest.setParentTask(parentTaskId); client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(response -> { - if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) { + if (response.getBulkFailures().isEmpty() == false || response.getSearchFailures().isEmpty() == false) { LOGGER.error( "Some unused stats documents could not be deleted due to failures: {}", Strings.collectionToCommaDelimitedString(response.getBulkFailures()) @@ -124,4 +132,5 @@ private void executeDeleteUnusedStatsDocs(QueryBuilder dbq, float requestsPerSec listener.onFailure(e); })); } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/WritableIndexExpander.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/WritableIndexExpander.java new file mode 100644 index 0000000000000..1203da86ef70b --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/WritableIndexExpander.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * This class is used to expand index patterns and filter out read-only indices. + * It is used in the context of machine learning jobs retention to ensure that only writable indices are considered. + */ +public class WritableIndexExpander { + + private final ClusterService clusterService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private static WritableIndexExpander INSTANCE; + + public static void initialize(ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) { + INSTANCE = new WritableIndexExpander(clusterService, indexNameExpressionResolver); + } + + public static void initialize(WritableIndexExpander newInstance) { + INSTANCE = newInstance; + } + + public static WritableIndexExpander getInstance() { + if (INSTANCE == null) { + throw new IllegalStateException("WritableIndexExpander is not initialized"); + } + return INSTANCE; + } + + protected WritableIndexExpander(ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) { + this.clusterService = Objects.requireNonNull(clusterService); + this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver); + } + + public ArrayList getWritableIndices(String indexPattern) { + return getWritableIndices(List.of(indexPattern)); + } + + public ArrayList getWritableIndices(Collection indices) { + if (indices == null || indices.isEmpty()) { + return new ArrayList<>(); + } + var clusterState = clusterService.state(); + return indices.stream() + .map(index -> indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN, index)) + .flatMap(Arrays::stream) + .filter(index -> (isIndexReadOnly(index) == false)) + .collect(Collectors.toCollection(ArrayList::new)); + } + + private Boolean isIndexReadOnly(String indexName) { + IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName); + if (indexMetadata == null) { + throw new IllegalArgumentException("Failed to identify if index is read-only: index [" + indexName + "] not found"); + } + if (indexMetadata.getSettings() == null) { + throw new IllegalStateException("Settings for index [" + indexName + "] are unavailable"); + } + return IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexMetadata.getSettings()); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java index 124c6c6878ac9..d6561cde36068 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -20,6 +21,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; +import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.junit.After; import org.junit.Before; @@ -61,6 +63,7 @@ public void setup() { threadPool = new TestThreadPool("TransportDeleteExpiredDataActionTests thread pool"); Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); + WritableIndexExpander.initialize(clusterService, TestIndexNameExpressionResolver.newInstance()); auditor = mock(AnomalyDetectionAuditor.class); transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction( threadPool, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java index edff59f01b026..285574689ee9c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.ml.job.retention.MockWritableIndexExpander; import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -33,6 +34,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -61,6 +63,7 @@ public void verifyNoMoreInteractionsWithClient() { } public void testDeleteAllAnnotations() { + MockWritableIndexExpander.create(true); Arrays.asList(false, true).forEach(deleteUserAnnotations -> { JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations); jobDataDeleter.deleteAllAnnotations(ActionTestUtils.assertNoFailureListener(deleteResponse -> {})); @@ -85,6 +88,7 @@ public void testDeleteAllAnnotations() { } public void testDeleteAnnotations_TimestampFiltering() { + MockWritableIndexExpander.create(true); Arrays.asList(false, true).forEach(deleteUserAnnotations -> { JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations); Tuple range = randomFrom( @@ -114,6 +118,7 @@ public void testDeleteAnnotations_TimestampFiltering() { } public void testDeleteAnnotations_EventFiltering() { + MockWritableIndexExpander.create(true); Arrays.asList(false, true).forEach(deleteUserAnnotations -> { JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations); jobDataDeleter.deleteAnnotations( @@ -143,6 +148,7 @@ public void testDeleteAnnotations_EventFiltering() { } public void testDeleteDatafeedTimingStats() { + MockWritableIndexExpander.create(true); Arrays.asList(false, true).forEach(deleteUserAnnotations -> { JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations); jobDataDeleter.deleteDatafeedTimingStats(ActionTestUtils.assertNoFailureListener(deleteResponse -> {})); @@ -157,4 +163,20 @@ public void testDeleteDatafeedTimingStats() { assertThat(deleteRequest.indices(), is(arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID)))); }); } + + public void testDeleteDatafeedTimingStats_WhenIndexReadOnly_ShouldNotDeleteAnything() { + MockWritableIndexExpander.create(false); + Arrays.asList(false, true).forEach(deleteUserAnnotations -> { + JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations); + jobDataDeleter.deleteDatafeedTimingStats(ActionTestUtils.assertNoFailureListener(deleteResponse -> {})); + + if (deleteUserAnnotations) { + verify(client, never()).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any()); + client.threadPool(); + } else { + verify(client, never()).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any()); + client.threadPool(); + } + }); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 6ec43ca2a3201..5b2735afd9776 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -11,8 +11,10 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.tasks.TaskId; @@ -84,6 +86,7 @@ protected void removeDataBefore( public void setUpTests() { Client client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); + WritableIndexExpander.initialize(mock(ClusterService.class), TestIndexNameExpressionResolver.newInstance()); } static SearchResponse createSearchResponse(List toXContents) throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemoverTests.java index 59e0093abfba9..3ca472dcbd14e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredAnnotationsRemoverTests.java @@ -152,6 +152,21 @@ public void testCalcCutoffEpochMs() { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime))); } + public void testRemove_GivenIndexNotWritable_ShouldHandleGracefully() { + givenBucket(new Bucket("id_not_important", new Date(), 60)); + List jobs = Arrays.asList( + JobTests.buildJobBuilder("annotations-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("annotations-2").setResultsRetentionDays(20L).build() + ); + + // annotationIndexWritable = false + createExpiredAnnotationsRemover(jobs.iterator(), false).remove(1.0f, listener, () -> false); + + // No DBQ requests should be made, but listener should still be called with true + assertThat(capturedDeleteByQueryRequests.size(), equalTo(0)); + verify(listener).onResponse(true); + } + private void givenDBQRequestsSucceed() { givenDBQRequest(true); } @@ -185,7 +200,7 @@ private void givenBucket(Bucket bucket) { }).when(client).execute(eq(TransportSearchAction.TYPE), any(), any()); } - private ExpiredAnnotationsRemover createExpiredAnnotationsRemover(Iterator jobIterator) { + private ExpiredAnnotationsRemover createExpiredAnnotationsRemover(Iterator jobIterator, boolean annotationIndexWritable) { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -197,6 +212,7 @@ private ExpiredAnnotationsRemover createExpiredAnnotationsRemover(Iterator return null; }).when(executor).execute(any()); + MockWritableIndexExpander.create(annotationIndexWritable); return new ExpiredAnnotationsRemover( originSettingClient, jobIterator, @@ -205,4 +221,8 @@ private ExpiredAnnotationsRemover createExpiredAnnotationsRemover(Iterator threadPool ); } + + private ExpiredAnnotationsRemover createExpiredAnnotationsRemover(Iterator jobIterator) { + return createExpiredAnnotationsRemover(jobIterator, true); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 98dc3bf3ea84b..8795deda405cf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -83,7 +83,6 @@ public void setUpTests() { client = mock(Client.class); originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN); resultsProvider = mock(JobResultsProvider.class); - listener = new TestListener(); } @@ -279,7 +278,44 @@ public void testCalcCutoffEpochMs() { verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(oneDayAgo.getTime(), expectedCutoffTime))); } + public void testRemove_GivenIndexNotWritable_ShouldHandleGracefully() { + List searchResponses = new ArrayList<>(); + List jobs = Arrays.asList( + JobTests.buildJobBuilder("job-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build() + ); + + Date now = new Date(); + Date oneDayAgo = new Date(now.getTime() - TimeValue.timeValueDays(1).getMillis()); + SearchHit snapshot1 = createModelSnapshotQueryHit("job-1", "fresh-snapshot", oneDayAgo); + searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot1))); + + Date eightDaysAndOneMsAgo = new Date(now.getTime() - TimeValue.timeValueDays(8).getMillis() - 1); + Map> snapshotResponses = new HashMap<>(); + snapshotResponses.put( + "job-1", + Arrays.asList( + // Keeping active as its expiration is not known. We can assume "worst case" and verify it is not removed + createModelSnapshot("job-1", "active", eightDaysAndOneMsAgo), + createModelSnapshot("job-1", "old-snapshot", eightDaysAndOneMsAgo) + ) + ); + + givenClientRequestsSucceed(searchResponses, snapshotResponses); + + // Create remover with state index not writable + createExpiredModelSnapshotsRemover(jobs.iterator(), false).remove(1.0f, listener, () -> false); + + listener.waitToCompletion(); + // Should succeed, but not attempt to delete anything + assertThat(listener.success, is(true)); + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); + } + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator jobIterator) { + return createExpiredModelSnapshotsRemover(jobIterator, true); + } + + private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator jobIterator, boolean isStateIndexWritable) { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -290,11 +326,14 @@ private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover(Iterator run.run(); return null; }).when(executor).execute(any()); + + MockWritableIndexExpander.create(isStateIndexWritable); + return new ExpiredModelSnapshotsRemover( originSettingClient, jobIterator, - threadPool, new TaskId("test", 0L), + threadPool, resultsProvider, mock(AnomalyDetectionAuditor.class) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 9a768b7f635bd..90586fe0a3ea5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -135,6 +135,22 @@ public void testRemove_GivenClientRequestsFailed() { verify(listener).onFailure(any()); } + public void testRemove_GivenIndexNotWritable_ShouldHandleGracefully() { + givenBucket(new Bucket("id_not_important", new Date(), 60)); + + // Prepare one job with a retention policy + List jobs = Arrays.asList( + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(), + JobTests.buildJobBuilder("results-1").setResultsRetentionDays(20L).build() + ); + + createExpiredResultsRemover(jobs.iterator(), false).remove(1.0f, listener, () -> false); + + // Assert: success callback invoked, no DBQ requests + verify(listener).onResponse(true); + assertThat(capturedDeleteByQueryRequests.size(), equalTo(0)); + } + @SuppressWarnings("unchecked") public void testCalcCutoffEpochMs() { String jobId = "calc-cutoff"; @@ -186,6 +202,10 @@ private void givenBucket(Bucket bucket) { } private ExpiredResultsRemover createExpiredResultsRemover(Iterator jobIterator) { + return createExpiredResultsRemover(jobIterator, true); + } + + private ExpiredResultsRemover createExpiredResultsRemover(Iterator jobIterator, boolean isResultsIndexWritable) { ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executor = mock(ExecutorService.class); @@ -197,6 +217,8 @@ private ExpiredResultsRemover createExpiredResultsRemover(Iterator jobItera return null; }).when(executor).execute(any()); + MockWritableIndexExpander.create(isResultsIndexWritable); + return new ExpiredResultsRemover( originSettingClient, jobIterator, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MockWritableIndexExpander.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MockWritableIndexExpander.java new file mode 100644 index 0000000000000..a2cf4479db0da --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MockWritableIndexExpander.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.mockito.ArgumentMatchers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockWritableIndexExpander { + public static WritableIndexExpander create(boolean stateIndexWritable) { + WritableIndexExpander.initialize(mock(WritableIndexExpander.class)); + WritableIndexExpander writableIndexExpander = WritableIndexExpander.getInstance(); + if (stateIndexWritable) { + mockWhenIndicesAreWritable(writableIndexExpander); + } else { + mockWhenIndicesAreNotWritable(writableIndexExpander); + } + return writableIndexExpander; + } + + private static void mockWhenIndicesAreNotWritable(WritableIndexExpander writableIndexExpander) { + when(writableIndexExpander.getWritableIndices(anyString())) + .thenReturn(new ArrayList<>()); + when(writableIndexExpander.getWritableIndices(ArgumentMatchers.>any())) + .thenReturn(new ArrayList<>()); + } + + private static void mockWhenIndicesAreWritable(WritableIndexExpander writableIndexExpander) { + when(writableIndexExpander.getWritableIndices(anyString())).thenAnswer(invocation -> { + String input = invocation.getArgument(0); + return new ArrayList<>(List.of(input)); + }); + when(writableIndexExpander.getWritableIndices(ArgumentMatchers.>any())).thenAnswer( + invocation -> new ArrayList<>(invocation.getArgument(0)) + ); + } +}