Skip to content

Commit 18808b4

Browse files
committed
prevent JobDataDeleter from attempting to delete from read-only indices
1 parent 4447a52 commit 18808b4

File tree

2 files changed

+92
-36
lines changed

2 files changed

+92
-36
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -134,21 +134,10 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
134134
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
135135
}
136136

137-
// Remove read-only indices
138-
List<String> indicesToQuery;
139-
try {
140-
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indices);
141-
} catch (Exception e) {
142-
logger.error("Failed to get writable indices for [" + jobId + "].", e);
143-
listener.onFailure(e);
144-
return;
145-
}
146-
if (indicesToQuery.isEmpty()) {
147-
logger.info("No writable model snapshot indices found for [{}] job. No expired model snapshots to remove.", jobId);
148-
return;
149-
}
137+
String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
138+
if (indicesToQuery.length == 0) return;
150139

151-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setRefresh(true)
140+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
152141
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
153142
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
154143

@@ -197,22 +186,15 @@ public void deleteAnnotations(
197186
}
198187
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
199188

200-
List<String> indicesToQuery = List.of(AnnotationIndex.READ_ALIAS_NAME);
201-
// Remove read-only indices
202-
try {
203-
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
204-
} catch (Exception e) {
205-
logger.error("Failed to get writable indices for [" + jobId + "]", e);
206-
listener.onFailure(e);
207-
return;
208-
}
209-
if (indicesToQuery.isEmpty()) {
210-
logger.info("No writable annotation indices found for [{}] job. No annotations to remove.", jobId);
211-
listener.onResponse(true);
212-
return;
213-
}
189+
String[] indicesToQuery = removeReadOnlyIndices(
190+
List.of(AnnotationIndex.READ_ALIAS_NAME),
191+
listener,
192+
"annotations",
193+
() -> listener.onResponse(true)
194+
);
195+
if (indicesToQuery.length == 0) return;
214196

215-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery.toArray(new String[0])).setQuery(query)
197+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
216198
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
217199
.setAbortOnVersionConflict(false)
218200
.setRefresh(true)
@@ -230,6 +212,28 @@ public void deleteAnnotations(
230212
);
231213
}
232214

215+
private <T> String[] removeReadOnlyIndices(
216+
List<String> indicesToQuery,
217+
ActionListener<T> listener,
218+
String entityType,
219+
Runnable onEmpty
220+
) {
221+
try {
222+
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
223+
} catch (Exception e) {
224+
logger.error("Failed to get writable indices for [{}]", jobId, e);
225+
listener.onFailure(e);
226+
return new String[0];
227+
}
228+
if (indicesToQuery.isEmpty()) {
229+
logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType);
230+
if (onEmpty != null) {
231+
onEmpty.run();
232+
}
233+
}
234+
return indicesToQuery.toArray(String[]::new);
235+
}
236+
233237
/**
234238
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}.
235239
* Forecasts are <em>not</em> deleted, as they will not be automatically regenerated after
@@ -251,7 +255,14 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
251255
)
252256
)
253257
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
254-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setQuery(query)
258+
String[] indicesToQuery = removeReadOnlyIndices(
259+
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
260+
listener,
261+
"results",
262+
() -> listener.onResponse(true)
263+
);
264+
if (indicesToQuery.length == 0) return;
265+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
255266
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
256267
.setAbortOnVersionConflict(false)
257268
.setRefresh(true)
@@ -296,9 +307,14 @@ public void deleteInterimResults() {
296307
* @param listener Response listener
297308
*/
298309
public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> listener) {
299-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setRefresh(
300-
true
301-
)
310+
String[] indicesToQuery = removeReadOnlyIndices(
311+
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
312+
listener,
313+
"datafeed timing stats",
314+
null
315+
);
316+
if (indicesToQuery.length == 0) return;
317+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
302318
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
303319
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));
304320

@@ -488,7 +504,9 @@ private void deleteResultsByQuery(
488504
ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
489505
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
490506
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
491-
DeleteByQueryRequest request = new DeleteByQueryRequest(indices).setQuery(query)
507+
String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
508+
if (indicesToQuery.length == 0) return;
509+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
492510
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
493511
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
494512
.setAbortOnVersionConflict(false)
@@ -561,7 +579,16 @@ private static IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesRespons
561579
private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener<Boolean> finishedHandler) {
562580
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
563581
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
564-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
582+
583+
String[] indicesToQuery = removeReadOnlyIndices(
584+
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
585+
finishedHandler,
586+
"quantiles",
587+
() -> finishedHandler.onResponse(true)
588+
);
589+
if (indicesToQuery.length == 0) return;
590+
591+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
565592
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
566593
.setAbortOnVersionConflict(false)
567594
.setRefresh(true);
@@ -591,7 +618,14 @@ private void deleteCategorizerState(
591618
) {
592619
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
593620
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
594-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
621+
String[] indicesToQuery = removeReadOnlyIndices(
622+
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
623+
finishedHandler,
624+
"categorizer state",
625+
() -> finishedHandler.onResponse(true)
626+
);
627+
if (indicesToQuery.length == 0) return;
628+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
595629
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
596630
.setAbortOnVersionConflict(false)
597631
.setRefresh(true);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.threadpool.ThreadPool;
1919
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
2020
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
21+
import org.elasticsearch.xpack.ml.job.retention.MockWritableIndexExpander;
2122
import org.junit.After;
2223
import org.junit.Before;
2324
import org.mockito.ArgumentCaptor;
@@ -33,6 +34,7 @@
3334
import static org.mockito.ArgumentMatchers.any;
3435
import static org.mockito.ArgumentMatchers.eq;
3536
import static org.mockito.Mockito.mock;
37+
import static org.mockito.Mockito.never;
3638
import static org.mockito.Mockito.times;
3739
import static org.mockito.Mockito.verify;
3840
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -61,6 +63,7 @@ public void verifyNoMoreInteractionsWithClient() {
6163
}
6264

6365
public void testDeleteAllAnnotations() {
66+
MockWritableIndexExpander.create(true);
6467
Arrays.asList(false, true).forEach(deleteUserAnnotations -> {
6568
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations);
6669
jobDataDeleter.deleteAllAnnotations(ActionTestUtils.assertNoFailureListener(deleteResponse -> {}));
@@ -85,6 +88,7 @@ public void testDeleteAllAnnotations() {
8588
}
8689

8790
public void testDeleteAnnotations_TimestampFiltering() {
91+
MockWritableIndexExpander.create(true);
8892
Arrays.asList(false, true).forEach(deleteUserAnnotations -> {
8993
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations);
9094
Tuple<Long, Long> range = randomFrom(
@@ -114,6 +118,7 @@ public void testDeleteAnnotations_TimestampFiltering() {
114118
}
115119

116120
public void testDeleteAnnotations_EventFiltering() {
121+
MockWritableIndexExpander.create(true);
117122
Arrays.asList(false, true).forEach(deleteUserAnnotations -> {
118123
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations);
119124
jobDataDeleter.deleteAnnotations(
@@ -143,6 +148,7 @@ public void testDeleteAnnotations_EventFiltering() {
143148
}
144149

145150
public void testDeleteDatafeedTimingStats() {
151+
MockWritableIndexExpander.create(true);
146152
Arrays.asList(false, true).forEach(deleteUserAnnotations -> {
147153
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations);
148154
jobDataDeleter.deleteDatafeedTimingStats(ActionTestUtils.assertNoFailureListener(deleteResponse -> {}));
@@ -157,4 +163,20 @@ public void testDeleteDatafeedTimingStats() {
157163
assertThat(deleteRequest.indices(), is(arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID))));
158164
});
159165
}
166+
167+
public void testDeleteDatafeedTimingStats_WhenIndexReadOnly_ShouldNotDeleteAnything() {
168+
MockWritableIndexExpander.create(false);
169+
Arrays.asList(false, true).forEach(deleteUserAnnotations -> {
170+
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, deleteUserAnnotations);
171+
jobDataDeleter.deleteDatafeedTimingStats(ActionTestUtils.assertNoFailureListener(deleteResponse -> {}));
172+
173+
if (deleteUserAnnotations) {
174+
verify(client, never()).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
175+
client.threadPool();
176+
} else {
177+
verify(client, never()).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
178+
client.threadPool();
179+
}
180+
});
181+
}
160182
}

0 commit comments

Comments
 (0)