Skip to content

Commit c4982eb

Browse files
committed
[ML] Delete obsolete snapshot stats after upgrade (#121661)
Ensure that the old snapshot model_size_stats document is removed after the snapshot upgrade. Marking it as a non-issue since the bug was not released yet.
1 parent 3d2c68c commit c4982eb

File tree

2 files changed

+105
-30
lines changed

2 files changed

+105
-30
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java

Lines changed: 97 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,27 @@
1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.ElasticsearchStatusException;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.DocWriteResponse;
16+
import org.elasticsearch.action.search.SearchResponse;
17+
import org.elasticsearch.action.support.WriteRequest;
1518
import org.elasticsearch.client.internal.Client;
1619
import org.elasticsearch.common.CheckedSupplier;
1720
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1821
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
19-
import org.elasticsearch.common.util.concurrent.FutureUtils;
2022
import org.elasticsearch.common.util.concurrent.ThreadContext;
2123
import org.elasticsearch.core.IOUtils;
24+
import org.elasticsearch.core.Nullable;
25+
import org.elasticsearch.index.query.QueryBuilders;
2226
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
2327
import org.elasticsearch.rest.RestStatus;
28+
import org.elasticsearch.search.SearchHit;
2429
import org.elasticsearch.threadpool.ThreadPool;
2530
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
2631
import org.elasticsearch.xpack.core.ml.job.config.Job;
32+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
2733
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
34+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
35+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
2836
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
2937
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
3038
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@@ -44,9 +52,7 @@
4452
import java.util.HashMap;
4553
import java.util.Map;
4654
import java.util.Objects;
47-
import java.util.concurrent.ExecutionException;
4855
import java.util.concurrent.ExecutorService;
49-
import java.util.concurrent.Future;
5056
import java.util.concurrent.TimeoutException;
5157
import java.util.function.BiConsumer;
5258
import java.util.function.Consumer;
@@ -153,6 +159,55 @@ synchronized void start() {
153159
executor.execute();
154160
}
155161

162+
private void removeDuplicateModelSnapshotDoc(Consumer<Exception> runAfter) {
163+
String snapshotDocId = jobId + "_model_snapshot_" + snapshotId;
164+
client.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPattern())
165+
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.idsQuery().addIds(snapshotDocId)))
166+
.setSize(2)
167+
.addSort(ModelSnapshot.MIN_VERSION.getPreferredName(), org.elasticsearch.search.sort.SortOrder.ASC)
168+
.execute(ActionListener.wrap(searchResponse -> {
169+
if (searchResponse.getHits().getTotalHits().value > 1) {
170+
deleteOlderSnapshotDoc(searchResponse, runAfter);
171+
} else {
172+
onFinish.accept(null);
173+
}
174+
}, e -> {
175+
logger.warn(() -> format("[%s] [%s] error during search for model snapshot documents", jobId, snapshotId), e);
176+
onFinish.accept(null);
177+
}));
178+
}
179+
180+
private void deleteOlderSnapshotDoc(SearchResponse searchResponse, Consumer<Exception> runAfter) {
181+
SearchHit firstHit = searchResponse.getHits().getAt(0);
182+
logger.debug(() -> format("[%s] deleting duplicate model snapshot doc [%s]", jobId, firstHit.getId()));
183+
client.prepareDelete()
184+
.setIndex(firstHit.getIndex())
185+
.setId(firstHit.getId())
186+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
187+
.execute(ActionListener.runAfter(ActionListener.wrap(deleteResponse -> {
188+
if ((deleteResponse.getResult() == DocWriteResponse.Result.DELETED) == false) {
189+
logger.warn(
190+
() -> format(
191+
"[%s] [%s] failed to delete old snapshot [%s] result document, document not found",
192+
jobId,
193+
snapshotId,
194+
ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()
195+
)
196+
);
197+
}
198+
}, e -> {
199+
logger.warn(
200+
() -> format(
201+
"[%s] [%s] failed to delete old snapshot [%s] result document",
202+
jobId,
203+
snapshotId,
204+
ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()
205+
),
206+
e
207+
);
208+
}), () -> runAfter.accept(null)));
209+
}
210+
156211
void setTaskToFailed(String reason, ActionListener<PersistentTask<?>> listener) {
157212
SnapshotUpgradeTaskState taskState = new SnapshotUpgradeTaskState(SnapshotUpgradeState.FAILED, task.getAllocationId(), reason);
158213
task.updatePersistentTaskState(taskState, ActionListener.wrap(listener::onResponse, f -> {
@@ -259,7 +314,7 @@ void restoreState() {
259314
logger.error(() -> format("[%s] [%s] failed to write old state", jobId, snapshotId), e);
260315
setTaskToFailed(
261316
"Failed to write old state due to: " + e.getMessage(),
262-
ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
317+
ActionListener.running(() -> shutdownWithFailure(e))
263318
);
264319
return;
265320
}
@@ -273,7 +328,7 @@ void restoreState() {
273328
logger.error(() -> format("[%s] [%s] failed to flush after writing old state", jobId, snapshotId), e);
274329
nextStep = () -> setTaskToFailed(
275330
"Failed to flush after writing old state due to: " + e.getMessage(),
276-
ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
331+
ActionListener.running(() -> shutdownWithFailure(e))
277332
);
278333
} else {
279334
logger.debug(
@@ -295,7 +350,7 @@ private void requestStateWrite() {
295350
new SnapshotUpgradeTaskState(SnapshotUpgradeState.SAVING_NEW_STATE, task.getAllocationId(), ""),
296351
ActionListener.wrap(readingNewState -> {
297352
if (continueRunning.get() == false) {
298-
shutdown(null);
353+
shutdownWithFailure(null);
299354
return;
300355
}
301356
submitOperation(() -> {
@@ -310,12 +365,12 @@ private void requestStateWrite() {
310365
// Execute callback in the UTILITY thread pool, as the current thread in the callback will be one in the
311366
// autodetectWorkerExecutor. Trying to run the callback in that executor will cause a dead lock as that
312367
// executor has a single processing queue.
313-
(aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> shutdown(e))
368+
(aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> handlePersistingState(e))
314369
);
315370
logger.debug("[{}] [{}] asked for state to be persisted", jobId, snapshotId);
316371
}, f -> {
317372
logger.error(() -> format("[%s] [%s] failed to update snapshot upgrader task to started", jobId, snapshotId), f);
318-
shutdown(
373+
shutdownWithFailure(
319374
new ElasticsearchStatusException(
320375
"Failed to start snapshot upgrade [{}] for job [{}]",
321376
RestStatus.INTERNAL_SERVER_ERROR,
@@ -378,17 +433,45 @@ private void checkResultsProcessorIsAlive() {
378433
}
379434
}
380435

381-
void shutdown(Exception e) {
436+
private void handlePersistingState(@Nullable Exception exception) {
437+
assert Thread.currentThread().getName().contains(UTILITY_THREAD_POOL_NAME);
438+
439+
if (exception != null) {
440+
shutdownWithFailure(exception);
441+
} else {
442+
stopProcess((aVoid, e) -> {
443+
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
444+
autodetectWorkerExecutor.shutdownNow();
445+
// If there are two snapshot documents in the results indices with the same snapshot id,
446+
// remove the old one. This can happen when the result index has been rolled over and
447+
// the write alias is pointing to the new index.
448+
removeDuplicateModelSnapshotDoc(onFinish);
449+
});
450+
451+
});
452+
}
453+
}
454+
455+
void shutdownWithFailure(Exception e) {
456+
stopProcess((aVoid, ignored) -> {
457+
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
458+
onFinish.accept(e);
459+
autodetectWorkerExecutor.shutdownNow();
460+
});
461+
});
462+
}
463+
464+
private void stopProcess(BiConsumer<Class<Void>, Exception> runNext) {
382465
logger.debug("[{}] [{}] shutdown initiated", jobId, snapshotId);
383466
// No point in sending an action to the executor if the process has died
384467
if (process.isProcessAlive() == false) {
385468
logger.debug("[{}] [{}] process is dead, no need to shutdown", jobId, snapshotId);
386-
onFinish.accept(e);
387-
autodetectWorkerExecutor.shutdownNow();
388469
stateStreamer.cancel();
470+
runNext.accept(null, null);
389471
return;
390472
}
391-
Future<?> future = autodetectWorkerExecutor.submit(() -> {
473+
474+
submitOperation(() -> {
392475
try {
393476
logger.debug("[{}] [{}] shutdown is now occurring", jobId, snapshotId);
394477
if (process.isReady()) {
@@ -401,24 +484,10 @@ void shutdown(Exception e) {
401484
processor.awaitCompletion();
402485
} catch (IOException | TimeoutException exc) {
403486
logger.warn(() -> format("[%s] [%s] failed to shutdown process", jobId, snapshotId), exc);
404-
} finally {
405-
onFinish.accept(e);
406487
}
407488
logger.debug("[{}] [{}] connection for upgrade has been closed, process is shutdown", jobId, snapshotId);
408-
});
409-
try {
410-
future.get();
411-
autodetectWorkerExecutor.shutdownNow();
412-
} catch (InterruptedException interrupt) {
413-
Thread.currentThread().interrupt();
414-
} catch (ExecutionException executionException) {
415-
if (processor.isProcessKilled()) {
416-
// In this case the original exception is spurious and highly misleading
417-
throw ExceptionsHelper.conflictStatusException("close snapshot upgrade interrupted by kill request");
418-
} else {
419-
throw FutureUtils.rethrowExecutionException(executionException);
420-
}
421-
}
489+
return Void.TYPE;
490+
}, runNext);
422491
}
423492
}
424493
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ protected static void waitForPendingUpgraderTasks() throws Exception {
6565
* The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
6666
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
6767
*/
68-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98560")
6968
public void testSnapshotUpgrader() throws Exception {
7069
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
7170
adjustLoggingLevels.setJsonEntity("""
@@ -98,6 +97,13 @@ public void testSnapshotUpgrader() throws Exception {
9897

9998
@SuppressWarnings("unchecked")
10099
private void testSnapshotUpgradeFailsOnMixedCluster() throws Exception {
100+
// TODO the mixed cluster assertions sometimes fail because the code that
101+
// detects the mixed cluster relies on the transport versions being different.
102+
// This assumption does not hold immediately after a version bump and new
103+
// branch being cut as the new branch will have the same transport version
104+
// See https://github.com/elastic/elasticsearch/issues/98560
105+
106+
assumeTrue("The mixed cluster is not always detected correctly, see https://github.com/elastic/elasticsearch/issues/98560", false);
101107
Map<String, Object> jobs = entityAsMap(getJob(JOB_ID));
102108

103109
String currentSnapshot = ((List<String>) XContentMapValues.extractValue("jobs.model_snapshot_id", jobs)).get(0);
@@ -154,7 +160,7 @@ private void testSnapshotUpgrade() throws Exception {
154160

155161
List<Map<String, Object>> upgradedSnapshot = (List<Map<String, Object>>) entityAsMap(getModelSnapshots(JOB_ID, snapshotToUpgradeId))
156162
.get("model_snapshots");
157-
assertThat(upgradedSnapshot, hasSize(1));
163+
assertThat(upgradedSnapshot.toString(), upgradedSnapshot, hasSize(1));
158164
assertThat(upgradedSnapshot.get(0).get("latest_record_time_stamp"), equalTo(snapshotToUpgrade.get("latest_record_time_stamp")));
159165

160166
// Does the snapshot still work?

0 commit comments

Comments
 (0)