diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index a817a61617d36..2fc9e2afbc02d 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -108,6 +108,11 @@ protected void nodeOperation( ReindexDataStreamTaskParams params, PersistentTaskState persistentTaskState ) { + Long completionTime = getCompletionTime(persistentTaskState); + if (completionTime != null && task instanceof ReindexDataStreamTask reindexDataStreamTask) { + reindexDataStreamTask.allReindexesCompleted(threadPool, getTimeToLive(completionTime)); + return; + } ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState; String sourceDataStream = params.getSourceDataStream(); TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); @@ -316,6 +321,14 @@ private void completeFailedPersistentTask( persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e); } + private Long getCompletionTime(PersistentTaskState persistentTaskState) { + if (persistentTaskState instanceof ReindexDataStreamPersistentTaskState state) { + return state.completionTime(); + } else { + return null; + } + } + private TimeValue updateCompletionTimeAndGetTimeToLive( ReindexDataStreamTask reindexDataStreamTask, @Nullable ReindexDataStreamPersistentTaskState state @@ -345,6 +358,15 @@ private TimeValue updateCompletionTimeAndGetTimeToLive( completionTime = state.completionTime(); } } - return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime)); + return getTimeToLive(completionTime); + } + + private TimeValue getTimeToLive(long completionTimeInMillis) { + return TimeValue.timeValueMillis( + TASK_KEEP_ALIVE_TIME.millis() - Math.min( + TASK_KEEP_ALIVE_TIME.millis(), + threadPool.absoluteTimeInMillis() - completionTimeInMillis + ) + ); } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java index 8ab22674082e6..511899fae0049 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java @@ -46,6 +46,10 @@ public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException { this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong()); } + public boolean isComplete() { + return completionTime != null; + } + @Override public String getWriteableName() { return NAME; diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java index 6f5bf280296e1..996ac936af8b2 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java @@ -30,7 +30,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask { private final long persistentTaskStartTime; private final int initialTotalIndices; private final int initialTotalIndicesToBeUpgraded; - private volatile boolean complete = false; + private boolean isCompleteLocally = false; private volatile Exception exception; private final Set inProgress = Collections.synchronizedSet(new HashSet<>()); private final AtomicInteger pending = new AtomicInteger(); @@ -73,18 +73,26 @@ public ReindexDataStreamStatus getStatus() { clusterService.state(), getPersistentTaskId() ); + boolean isComplete; if (persistentTask != null) { ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState(); - if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) { - totalIndices = Math.toIntExact(state.totalIndices()); - totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded()); + if (state != null) { + isComplete = state.isComplete(); + if (state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) { + totalIndices = Math.toIntExact(state.totalIndices()); + totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded()); + } + } else { + isComplete = false; } + } else { + isComplete = false; } return new ReindexDataStreamStatus( persistentTaskStartTime, totalIndices, totalIndicesToBeUpgraded, - complete, + isComplete, exception, inProgress, pending.get(), @@ -93,7 +101,7 @@ public ReindexDataStreamStatus getStatus() { } public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) { - this.complete = true; + isCompleteLocally = true; if (isCancelled()) { completeTask.run(); } else { @@ -120,6 +128,24 @@ public void incrementInProgressIndicesCount(String index) { pending.decrementAndGet(); } + private boolean isCompleteInClusterState() { + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .getProject() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId()); + if (persistentTask != null) { + ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState(); + if (state != null) { + return state.isComplete(); + } else { + return false; + } + } else { + return false; + } + } + public void setPendingIndicesCount(int size) { pending.set(size); } @@ -130,8 +156,10 @@ public void onCancelled() { * If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed * immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of * allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing. + * We check both the cluster state and isCompleteLocally -- it is possible (especially in tests) that hte cluster state + * update has not happened in between when allReindexesCompleted was called and when this is called. */ - if (complete) { + if (isCompleteInClusterState() || isCompleteLocally) { completeTask.run(); } } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java index c3614fc2ed81b..1eff5c49dadc0 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java @@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -208,7 +209,9 @@ public void testUpgradeDataStream() throws Exception { } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { Map> oldIndicesMetadata = getIndicesMetadata(dataStreamName); upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled); + cancelReindexTask(dataStreamName); upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled); + cancelReindexTask(dataStreamFromNonDataStreamIndices); Map> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName); if (ilmEnabled) { @@ -219,6 +222,38 @@ public void testUpgradeDataStream() throws Exception { } } + public void testMigrateDoesNotRestartOnUpgrade() throws Exception { + /* + * This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task + * does not begin running again. + */ + String dataStreamName = "reindex_test_data_stream_ugprade_test"; + int numRollovers = randomIntBetween(0, 5); + boolean hasILMPolicy = randomBoolean(); + boolean ilmEnabled = hasILMPolicy && randomBoolean(); + if (CLUSTER_TYPE == ClusterType.OLD) { + createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled); + upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled); + } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { + makeSureNoUpgrade(dataStreamName); + cancelReindexTask(dataStreamName); + } else { + makeSureNoUpgrade(dataStreamName); + } + } + + private void cancelReindexTask(String dataStreamName) throws IOException { + Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel"); + String upgradeUser = "upgrade_user"; + String upgradeUserPassword = "x-pack-test-password"; + createRole("upgrade_role", dataStreamName); + createUser(upgradeUser, upgradeUserPassword, "upgrade_role"); + try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) { + Response cancelResponse = upgradeUserClient.performRequest(cancelRequest); + assertOK(cancelResponse); + } + } + private void compareIndexMetadata( Map> oldIndicesMetadata, Map> upgradedIndicesMetadata @@ -422,7 +457,10 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers "data_stream": { } }"""; - var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_template"); + var putIndexTemplateRequest = new Request( + "POST", + "/_index_template/reindex_test_data_stream_template" + randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT) + ); putIndexTemplateRequest.setJsonEntity(indexTemplate.replace("$TEMPLATE", template).replace("$PATTERN", dataStreamName)); assertOK(client().performRequest(putIndexTemplateRequest)); bulkLoadData(dataStreamName); @@ -651,7 +689,7 @@ private void upgradeDataStream( assertOK(statusResponse); assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true)); final int originalWriteIndex = 1; - if (isOriginalClusterSameMajorVersionAsCurrent()) { + if (isOriginalClusterSameMajorVersionAsCurrent() || CLUSTER_TYPE == ClusterType.OLD) { assertThat( statusResponseString, statusResponseMap.get("total_indices_in_data_stream"), @@ -698,10 +736,35 @@ private void upgradeDataStream( // Verify it's possible to reindex again after a successful reindex reindexResponse = upgradeUserClient.performRequest(reindexRequest); assertOK(reindexResponse); + } + } - Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel"); - Response cancelResponse = upgradeUserClient.performRequest(cancelRequest); - assertOK(cancelResponse); + private void makeSureNoUpgrade(String dataStreamName) throws Exception { + String upgradeUser = "upgrade_user"; + String upgradeUserPassword = "x-pack-test-password"; + createRole("upgrade_role", dataStreamName); + createUser(upgradeUser, upgradeUserPassword, "upgrade_role"); + try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) { + assertBusy(() -> { + try { + Request statusRequest = new Request("GET", "_migration/reindex/" + dataStreamName + "/_status"); + Response statusResponse = upgradeUserClient.performRequest(statusRequest); + Map statusResponseMap = XContentHelper.convertToMap( + JsonXContent.jsonXContent, + statusResponse.getEntity().getContent(), + false + ); + String statusResponseString = statusResponseMap.keySet() + .stream() + .map(key -> key + "=" + statusResponseMap.get(key)) + .collect(Collectors.joining(", ", "{", "}")); + assertOK(statusResponse); + assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true)); + assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(0)); + } catch (Exception e) { + fail(e); + } + }, 60, TimeUnit.SECONDS); } }