Skip to content

Commit d85ae7d

Browse files
authored
[ML] Add missing job_id filter to Anomaly Detection data deleter (elastic#138160)
When reverting to a previous model snapshot intervening results are removed. A bug in the delete-by-query query meant that results were deleted for all jobs in this period not just the job being deleted. The fix here reinstates the job_id filter.
1 parent 3b7ac32 commit d85ae7d

File tree

6 files changed

+83
-14
lines changed

6 files changed

+83
-14
lines changed

docs/changelog/138160.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138160
2+
summary: Add missing `job_id` filter to Anomaly Detection data deleter
3+
area: Machine Learning
4+
type: bug
5+
issues: []

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,6 @@ tests:
267267
- class: org.elasticsearch.xpack.sql.qa.mixed_node.SqlCompatIT
268268
method: testNullsOrderWithMissingOrderSupportQueryingNewNode
269269
issue: https://github.com/elastic/elasticsearch/issues/132249
270-
- class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT
271-
method: testRevertModelSnapshot_DeleteInterveningResults
272-
issue: https://github.com/elastic/elasticsearch/issues/132349
273-
- class: org.elasticsearch.xpack.ml.integration.RevertModelSnapshotIT
274-
method: testRevertModelSnapshot
275-
issue: https://github.com/elastic/elasticsearch/issues/132733
276270
- class: org.elasticsearch.packaging.test.ArchiveGenerateInitialCredentialsTests
277271
method: test40VerifyAutogeneratedCredentials
278272
issue: https://github.com/elastic/elasticsearch/issues/132877

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,28 @@ protected void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnota
303303
});
304304
}
305305

306+
protected void assertThatNumberOfAnnotationsIsEqualTo(String jobId, int expectedNumberOfAnnotations) throws Exception {
307+
// Refresh the annotations index so that recently indexed annotation docs are visible.
308+
indicesAdmin().prepareRefresh(AnnotationIndex.LATEST_INDEX_NAME)
309+
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
310+
.get();
311+
312+
SearchRequest searchRequest = client().prepareSearch(AnnotationIndex.READ_ALIAS_NAME)
313+
.setQuery(QueryBuilders.termQuery("job_id", jobId))
314+
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
315+
.request();
316+
317+
assertCheckedResponse(client().search(searchRequest), searchResponse -> {
318+
List<Annotation> annotations = new ArrayList<>();
319+
for (SearchHit hit : searchResponse.getHits().getHits()) {
320+
try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
321+
annotations.add(Annotation.fromXContent(parser, null));
322+
}
323+
}
324+
assertThat("Annotations were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations));
325+
});
326+
}
327+
306328
protected List<Annotation> getAnnotations() throws Exception {
307329
List<Annotation> annotations = new ArrayList<>();
308330
// Refresh the annotations index so that recently indexed annotation docs are visible.

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
*/
6666
public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
6767

68+
private static final long DATA_START_TIME = 1761955200000L;
69+
6870
@After
6971
public void tearDownData() {
7072
cleanUp();
@@ -75,7 +77,32 @@ public void testRevertModelSnapshot() throws Exception {
7577
}
7678

7779
public void testRevertModelSnapshot_DeleteInterveningResults() throws Exception {
80+
// Create and run a unrelated job to chech it is not affected by reverting a different job
81+
String jobId = "revert-snapshot-delete-intervening-unrelated-job";
82+
83+
TimeValue bucketSpan = TimeValue.timeValueHours(1);
84+
long startTime = DATA_START_TIME - (bucketSpan.getMillis() * 2);
85+
String data = String.join("", generateData(startTime, bucketSpan, 23, List.of("foo"), (bucketIndex, series) -> 10.0));
86+
87+
Job.Builder job = buildAndRegisterJob(jobId, bucketSpan);
88+
openJob(job.getId());
89+
postData(job.getId(), data);
90+
flushJob(job.getId(), true);
91+
closeJob(job.getId());
92+
93+
String snapShotId = getJob(jobId).get(0).getModelSnapshotId();
94+
assertThat(snapShotId, is(notNullValue()));
95+
List<Bucket> buckets = getBuckets(jobId);
96+
assertThat(buckets.size(), greaterThan(0));
97+
98+
// Run another job and revert to an previous snapshot
7899
testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion("revert-model-snapshot-it-job-delete-intervening-results", true);
100+
101+
// Check snapshot Id and buckets have not changed
102+
assertThat(getJob(jobId).getFirst().getModelSnapshotId(), is(snapShotId));
103+
List<Bucket> bucketsAfterRevert = getBuckets(jobId);
104+
assertThat(bucketsAfterRevert.size(), is(buckets.size()));
105+
assertThat(bucketsAfterRevert, is(buckets));
79106
}
80107

81108
public void testRevertToEmptySnapshot() throws Exception {
@@ -126,13 +153,13 @@ public void testRevertToEmptySnapshot() throws Exception {
126153

127154
private void testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion(String jobId, boolean deleteInterveningResults) throws Exception {
128155
TimeValue bucketSpan = TimeValue.timeValueHours(1);
129-
long startTime = 1491004800000L;
130156

131157
Job.Builder job = buildAndRegisterJob(jobId, bucketSpan);
132158
openJob(job.getId());
133159
postData(
134160
job.getId(),
135-
generateData(startTime, bucketSpan, 10, Arrays.asList("foo"), (bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream()
161+
generateData(DATA_START_TIME, bucketSpan, 10, Arrays.asList("foo"), (bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0)
162+
.stream()
136163
.collect(Collectors.joining())
137164
);
138165
flushJob(job.getId(), true);
@@ -156,7 +183,7 @@ private void testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion(String jobI
156183
postData(
157184
job.getId(),
158185
generateData(
159-
startTime + 10 * bucketSpan.getMillis(),
186+
DATA_START_TIME + 10 * bucketSpan.getMillis(),
160187
bucketSpan,
161188
10,
162189
Arrays.asList("foo", "bar"),
@@ -187,15 +214,15 @@ private void testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion(String jobI
187214
ModelSnapshot revertSnapshot = modelSnapshots.get(1);
188215

189216
// Check there are 2 annotations (one per model snapshot)
190-
assertThatNumberOfAnnotationsIsEqualTo(2);
217+
assertThatNumberOfAnnotationsIsEqualTo(jobId, 2);
191218

192219
// Add 3 new annotations...
193220
Instant lastResultTimestamp = revertSnapshot.getLatestResultTimeStamp().toInstant();
194221
client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(10), Event.DELAYED_DATA)).actionGet();
195222
client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(20), Event.MODEL_CHANGE)).actionGet();
196223
client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.minusSeconds(10), Event.MODEL_CHANGE)).actionGet();
197224
// ... and check there are 5 annotations in total now
198-
assertThatNumberOfAnnotationsIsEqualTo(5);
225+
assertThatNumberOfAnnotationsIsEqualTo(jobId, 5);
199226

200227
GetJobsStatsAction.Response.JobStats statsBeforeRevert = getJobStats(jobId).get(0);
201228
Instant timeBeforeRevert = Instant.now();
@@ -219,7 +246,7 @@ private void testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion(String jobI
219246
assertThat(getQuantiles(job.getId()).getTimestamp(), equalTo(revertSnapshot.getLatestResultTimeStamp()));
220247

221248
// Check annotations with event type from {delayed_data, model_change} have been removed if deleteInterveningResults flag is set
222-
assertThatNumberOfAnnotationsIsEqualTo(deleteInterveningResults ? 3 : 5);
249+
assertThatNumberOfAnnotationsIsEqualTo(jobId, deleteInterveningResults ? 3 : 5);
223250

224251
// Reverting should not have deleted any forecast docs
225252
assertThat(countForecastDocs(job.getId(), forecastId), is(numForecastDocs));
@@ -229,7 +256,7 @@ private void testRunJobInTwoPartsAndRevertSnapshotAndRunToCompletion(String jobI
229256
postData(
230257
job.getId(),
231258
generateData(
232-
startTime + 10 * bucketSpan.getMillis(),
259+
DATA_START_TIME + 10 * bucketSpan.getMillis(),
233260
bucketSpan,
234261
10,
235262
Arrays.asList("foo", "bar"),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ private <T> String[] removeReadOnlyIndices(
251251
*/
252252
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
253253
QueryBuilder query = QueryBuilders.boolQuery()
254+
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
254255
.filter(
255256
QueryBuilders.termsQuery(
256257
Result.RESULT_TYPE.getPreferredName(),
@@ -262,6 +263,7 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
262263
)
263264
)
264265
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
266+
265267
String[] indicesToQuery = removeReadOnlyIndices(
266268
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
267269
listener,

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public void setUpTests() {
5858

5959
@After
6060
public void verifyNoMoreInteractionsWithClient() {
61-
verify(client, times(2)).threadPool();
6261
verifyNoMoreInteractions(client);
6362
}
6463

@@ -85,6 +84,7 @@ public void testDeleteAllAnnotations() {
8584
assertThat(dbqQueryString, containsString("_xpack"));
8685
}
8786
});
87+
verify(client, times(2)).threadPool();
8888
}
8989

9090
public void testDeleteAnnotations_TimestampFiltering() {
@@ -115,6 +115,7 @@ public void testDeleteAnnotations_TimestampFiltering() {
115115
assertThat(dbqQueryString, containsString("_xpack"));
116116
}
117117
});
118+
verify(client, times(2)).threadPool();
118119
}
119120

120121
public void testDeleteAnnotations_EventFiltering() {
@@ -145,6 +146,22 @@ public void testDeleteAnnotations_EventFiltering() {
145146
assertThat(dbqQueryString, containsString("_xpack"));
146147
}
147148
});
149+
verify(client, times(2)).threadPool();
150+
}
151+
152+
public void testDeleteResultsFromTime() {
153+
MockWritableIndexExpander.create(true);
154+
long fromEpochMs = randomNonNegativeLong();
155+
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID, randomBoolean());
156+
jobDataDeleter.deleteResultsFromTime(fromEpochMs, ActionTestUtils.assertNoFailureListener(deleteResponse -> {}));
157+
158+
verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
159+
160+
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
161+
assertThat(deleteRequest.indices(), is(arrayContaining(".ml-anomalies-my-job-id")));
162+
String dbqQueryString = Strings.toString(deleteRequest.getSearchRequest().source().query());
163+
assertThat(dbqQueryString, containsString("{\"term\":{\"job_id\":{\"value\":\"my-job-id\"}}"));
164+
verify(client, times(1)).threadPool();
148165
}
149166

150167
public void testDeleteDatafeedTimingStats() {
@@ -162,6 +179,7 @@ public void testDeleteDatafeedTimingStats() {
162179
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
163180
assertThat(deleteRequest.indices(), is(arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID))));
164181
});
182+
verify(client, times(2)).threadPool();
165183
}
166184

167185
public void testDeleteDatafeedTimingStats_WhenIndexReadOnly_ShouldNotDeleteAnything() {
@@ -178,5 +196,6 @@ public void testDeleteDatafeedTimingStats_WhenIndexReadOnly_ShouldNotDeleteAnyth
178196
client.threadPool();
179197
}
180198
});
199+
verify(client, times(2)).threadPool();
181200
}
182201
}

0 commit comments

Comments
 (0)