Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,27 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand All @@ -44,9 +52,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -153,6 +159,55 @@ synchronized void start() {
executor.execute();
}

private void removeDuplicateModelSnapshotDoc(Consumer<Exception> runAfter) {
String snapshotDocId = jobId + "_model_snapshot_" + snapshotId;
client.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPattern())
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.idsQuery().addIds(snapshotDocId)))
.setSize(2)
.addSort(ModelSnapshot.MIN_VERSION.getPreferredName(), org.elasticsearch.search.sort.SortOrder.ASC)
.execute(ActionListener.wrap(searchResponse -> {
if (searchResponse.getHits().getTotalHits().value() > 1) {
deleteOlderSnapshotDoc(searchResponse, runAfter);
} else {
onFinish.accept(null);
}
}, e -> {
logger.warn(() -> format("[%s] [%s] error during search for model snapshot documents", jobId, snapshotId), e);
onFinish.accept(null);
}));
}

private void deleteOlderSnapshotDoc(SearchResponse searchResponse, Consumer<Exception> runAfter) {
SearchHit firstHit = searchResponse.getHits().getAt(0);
logger.debug(() -> format("[%s] deleting duplicate model snapshot doc [%s]", jobId, firstHit.getId()));
client.prepareDelete()
.setIndex(firstHit.getIndex())
.setId(firstHit.getId())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(ActionListener.runAfter(ActionListener.wrap(deleteResponse -> {
if ((deleteResponse.getResult() == DocWriteResponse.Result.DELETED) == false) {
logger.warn(
() -> format(
"[%s] [%s] failed to delete old snapshot [%s] result document, document not found",
jobId,
snapshotId,
ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()
)
);
}
}, e -> {
logger.warn(
() -> format(
"[%s] [%s] failed to delete old snapshot [%s] result document",
jobId,
snapshotId,
ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()
),
e
);
}), () -> runAfter.accept(null)));
}

void setTaskToFailed(String reason, ActionListener<PersistentTask<?>> listener) {
SnapshotUpgradeTaskState taskState = new SnapshotUpgradeTaskState(SnapshotUpgradeState.FAILED, task.getAllocationId(), reason);
task.updatePersistentTaskState(taskState, ActionListener.wrap(listener::onResponse, f -> {
Expand Down Expand Up @@ -259,7 +314,7 @@ void restoreState() {
logger.error(() -> format("[%s] [%s] failed to write old state", jobId, snapshotId), e);
setTaskToFailed(
"Failed to write old state due to: " + e.getMessage(),
ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
ActionListener.running(() -> shutdownWithFailure(e))
);
return;
}
Expand All @@ -273,7 +328,7 @@ void restoreState() {
logger.error(() -> format("[%s] [%s] failed to flush after writing old state", jobId, snapshotId), e);
nextStep = () -> setTaskToFailed(
"Failed to flush after writing old state due to: " + e.getMessage(),
ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
ActionListener.running(() -> shutdownWithFailure(e))
);
} else {
logger.debug(
Expand All @@ -295,7 +350,7 @@ private void requestStateWrite() {
new SnapshotUpgradeTaskState(SnapshotUpgradeState.SAVING_NEW_STATE, task.getAllocationId(), ""),
ActionListener.wrap(readingNewState -> {
if (continueRunning.get() == false) {
shutdown(null);
shutdownWithFailure(null);
return;
}
submitOperation(() -> {
Expand All @@ -310,12 +365,12 @@ private void requestStateWrite() {
// Execute callback in the UTILITY thread pool, as the current thread in the callback will be one in the
// autodetectWorkerExecutor. Trying to run the callback in that executor will cause a dead lock as that
// executor has a single processing queue.
(aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> shutdown(e))
(aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> handlePersistingState(e))
);
logger.debug("[{}] [{}] asked for state to be persisted", jobId, snapshotId);
}, f -> {
logger.error(() -> format("[%s] [%s] failed to update snapshot upgrader task to started", jobId, snapshotId), f);
shutdown(
shutdownWithFailure(
new ElasticsearchStatusException(
"Failed to start snapshot upgrade [{}] for job [{}]",
RestStatus.INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -378,17 +433,45 @@ private void checkResultsProcessorIsAlive() {
}
}

void shutdown(Exception e) {
private void handlePersistingState(@Nullable Exception exception) {
assert Thread.currentThread().getName().contains(UTILITY_THREAD_POOL_NAME);

if (exception != null) {
shutdownWithFailure(exception);
} else {
stopProcess((aVoid, e) -> {
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
autodetectWorkerExecutor.shutdownNow();
// If there are two snapshot documents in the results indices with the same snapshot id,
// remove the old one. This can happen when the result index has been rolled over and
// the write alias is pointing to the new index.
removeDuplicateModelSnapshotDoc(onFinish);
});

});
}
}

void shutdownWithFailure(Exception e) {
stopProcess((aVoid, ignored) -> {
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
onFinish.accept(e);
autodetectWorkerExecutor.shutdownNow();
});
});
}

private void stopProcess(BiConsumer<Class<Void>, Exception> runNext) {
logger.debug("[{}] [{}] shutdown initiated", jobId, snapshotId);
// No point in sending an action to the executor if the process has died
if (process.isProcessAlive() == false) {
logger.debug("[{}] [{}] process is dead, no need to shutdown", jobId, snapshotId);
onFinish.accept(e);
autodetectWorkerExecutor.shutdownNow();
stateStreamer.cancel();
runNext.accept(null, null);
return;
}
Future<?> future = autodetectWorkerExecutor.submit(() -> {

submitOperation(() -> {
try {
logger.debug("[{}] [{}] shutdown is now occurring", jobId, snapshotId);
if (process.isReady()) {
Expand All @@ -401,24 +484,10 @@ void shutdown(Exception e) {
processor.awaitCompletion();
} catch (IOException | TimeoutException exc) {
logger.warn(() -> format("[%s] [%s] failed to shutdown process", jobId, snapshotId), exc);
} finally {
onFinish.accept(e);
}
logger.debug("[{}] [{}] connection for upgrade has been closed, process is shutdown", jobId, snapshotId);
});
try {
future.get();
autodetectWorkerExecutor.shutdownNow();
} catch (InterruptedException interrupt) {
Thread.currentThread().interrupt();
} catch (ExecutionException executionException) {
if (processor.isProcessKilled()) {
// In this case the original exception is spurious and highly misleading
throw ExceptionsHelper.conflictStatusException("close snapshot upgrade interrupted by kill request");
} else {
throw FutureUtils.rethrowExecutionException(executionException);
}
}
return Void.TYPE;
}, runNext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ protected static void waitForPendingUpgraderTasks() throws Exception {
* The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98560")
public void testSnapshotUpgrader() throws Exception {
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
adjustLoggingLevels.setJsonEntity("""
Expand Down Expand Up @@ -98,6 +97,13 @@ public void testSnapshotUpgrader() throws Exception {

@SuppressWarnings("unchecked")
private void testSnapshotUpgradeFailsOnMixedCluster() throws Exception {
// TODO the mixed cluster assertions sometimes fail because the code that
// detects the mixed cluster relies on the transport versions being different.
// This assumption does not hold immediately after a version bump and new
// branch being cut as the new branch will have the same transport version
// See https://github.com/elastic/elasticsearch/issues/98560

assumeTrue("The mixed cluster is not always detected correctly, see https://github.com/elastic/elasticsearch/issues/98560", false);
Map<String, Object> jobs = entityAsMap(getJob(JOB_ID));

String currentSnapshot = ((List<String>) XContentMapValues.extractValue("jobs.model_snapshot_id", jobs)).get(0);
Expand Down Expand Up @@ -154,7 +160,7 @@ private void testSnapshotUpgrade() throws Exception {

List<Map<String, Object>> upgradedSnapshot = (List<Map<String, Object>>) entityAsMap(getModelSnapshots(JOB_ID, snapshotToUpgradeId))
.get("model_snapshots");
assertThat(upgradedSnapshot, hasSize(1));
assertThat(upgradedSnapshot.toString(), upgradedSnapshot, hasSize(1));
assertThat(upgradedSnapshot.get(0).get("latest_record_time_stamp"), equalTo(snapshotToUpgrade.get("latest_record_time_stamp")));

// Does the snapshot still work?
Expand Down