Skip to content

Commit e8c4e44

Browse files
committed
always call listener.onResponse for empty results
1 parent c822a57 commit e8c4e44

File tree

1 file changed

+25
-12
lines changed

1 file changed

+25
-12
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,7 @@ public JobDataDeleter(Client client, String jobId, boolean deleteUserAnnotations
109109
*/
110110
public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListener<BulkByScrollResponse> listener) {
111111
if (modelSnapshots.isEmpty()) {
112-
listener.onResponse(
113-
new BulkByScrollResponse(
114-
TimeValue.ZERO,
115-
new BulkByScrollTask.Status(Collections.emptyList(), null),
116-
Collections.emptyList(),
117-
Collections.emptyList(),
118-
false
119-
)
120-
);
112+
listener.onResponse(emptyBulkByScrollResponse());
121113
return;
122114
}
123115

@@ -134,7 +126,12 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
134126
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
135127
}
136128

137-
String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
129+
String[] indicesToQuery = removeReadOnlyIndices(
130+
new ArrayList<>(indices),
131+
listener,
132+
"model snapshots",
133+
() -> listener.onResponse(emptyBulkByScrollResponse())
134+
);
138135
if (indicesToQuery.length == 0) return;
139136

140137
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
@@ -147,6 +144,16 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
147144
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
148145
}
149146

147+
private static BulkByScrollResponse emptyBulkByScrollResponse() {
148+
return new BulkByScrollResponse(
149+
TimeValue.ZERO,
150+
new BulkByScrollTask.Status(Collections.emptyList(), null),
151+
Collections.emptyList(),
152+
Collections.emptyList(),
153+
false
154+
);
155+
}
156+
150157
/**
151158
* Asynchronously delete the annotations
152159
* If the deleteUserAnnotations field is set to true then all
@@ -228,6 +235,7 @@ private <T> String[] removeReadOnlyIndices(
228235
if (indicesToQuery.isEmpty()) {
229236
logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType);
230237
if (onEmpty != null) {
238+
logger.info("Running onEmpty callback for [{}] job", jobId);
231239
onEmpty.run();
232240
}
233241
}
@@ -311,7 +319,7 @@ public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> liste
311319
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
312320
listener,
313321
"datafeed timing stats",
314-
null
322+
() -> listener.onResponse(emptyBulkByScrollResponse())
315323
);
316324
if (indicesToQuery.length == 0) return;
317325
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
@@ -504,7 +512,12 @@ private void deleteResultsByQuery(
504512
ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
505513
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
506514
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
507-
String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
515+
String[] indicesToQuery = removeReadOnlyIndices(
516+
List.of(indices),
517+
listener,
518+
"results",
519+
() -> listener.onResponse(emptyBulkByScrollResponse())
520+
);
508521
if (indicesToQuery.length == 0) return;
509522
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
510523
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))

0 commit comments

Comments
 (0)