Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ protected void nodeOperation(
ReindexDataStreamTaskParams params,
PersistentTaskState persistentTaskState
) {
Long completionTime = getCompletionTime(persistentTaskState);
if (completionTime != null && task instanceof ReindexDataStreamTask reindexDataStreamTask) {
reindexDataStreamTask.allReindexesCompleted(threadPool, getTimeToLive(completionTime));
return;
}
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
String sourceDataStream = params.getSourceDataStream();
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
Expand Down Expand Up @@ -316,6 +321,18 @@ private void completeFailedPersistentTask(
persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e);
}

private Long getCompletionTime(PersistentTaskState persistentTaskState) {
if (persistentTaskState == null) {
return null;
} else {
if (persistentTaskState instanceof ReindexDataStreamPersistentTaskState state) {
return state.completionTime();
} else {
return null;
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think you need a null check prior to calling instanceof

private TimeValue updateCompletionTimeAndGetTimeToLive(
ReindexDataStreamTask reindexDataStreamTask,
@Nullable ReindexDataStreamPersistentTaskState state
Expand Down Expand Up @@ -345,6 +362,15 @@ private TimeValue updateCompletionTimeAndGetTimeToLive(
completionTime = state.completionTime();
}
}
return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime));
return getTimeToLive(completionTime);
}

private TimeValue getTimeToLive(long completionTimeInMillis) {
return TimeValue.timeValueMillis(
TASK_KEEP_ALIVE_TIME.millis() - Math.min(
TASK_KEEP_ALIVE_TIME.millis(),
threadPool.absoluteTimeInMillis() - completionTimeInMillis
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException {
this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
}

public boolean isComplete() {
return completionTime != null;
}

@Override
public String getWriteableName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
private final long persistentTaskStartTime;
private final int initialTotalIndices;
private final int initialTotalIndicesToBeUpgraded;
private volatile boolean complete = false;
private boolean isCompleteLocally = false;
private volatile Exception exception;
private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
private final AtomicInteger pending = new AtomicInteger();
Expand Down Expand Up @@ -73,18 +73,26 @@ public ReindexDataStreamStatus getStatus() {
clusterService.state(),
getPersistentTaskId()
);
boolean isComplete;
if (persistentTask != null) {
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
totalIndices = Math.toIntExact(state.totalIndices());
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
if (state != null) {
isComplete = state.isComplete();
if (state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
totalIndices = Math.toIntExact(state.totalIndices());
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
}
} else {
isComplete = false;
}
} else {
isComplete = false;
}
return new ReindexDataStreamStatus(
persistentTaskStartTime,
totalIndices,
totalIndicesToBeUpgraded,
complete,
isComplete,
exception,
inProgress,
pending.get(),
Expand All @@ -93,7 +101,7 @@ public ReindexDataStreamStatus getStatus() {
}

public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
this.complete = true;
isCompleteLocally = true;
if (isCancelled()) {
completeTask.run();
} else {
Expand All @@ -120,6 +128,24 @@ public void incrementInProgressIndicesCount(String index) {
pending.decrementAndGet();
}

private boolean isCompleteInClusterState() {
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.getProject()
.custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
if (persistentTask != null) {
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
if (state != null) {
return state.isComplete();
} else {
return false;
}
} else {
return false;
}
}

public void setPendingIndicesCount(int size) {
pending.set(size);
}
Expand All @@ -130,8 +156,10 @@ public void onCancelled() {
* If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
* immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
* allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
* We check both the cluster state and isCompleteLocally -- it is possible (especially in tests) that hte cluster state
* update has not happened in between when allReindexesCompleted was called and when this is called.
*/
if (complete) {
if (isCompleteInClusterState() || isCompleteLocally) {
completeTask.run();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -208,7 +209,9 @@ public void testUpgradeDataStream() throws Exception {
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
Map<String, Map<String, Object>> oldIndicesMetadata = getIndicesMetadata(dataStreamName);
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
cancelReindexTask(dataStreamName);
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled);
cancelReindexTask(dataStreamFromNonDataStreamIndices);
Map<String, Map<String, Object>> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName);

if (ilmEnabled) {
Expand All @@ -219,6 +222,38 @@ public void testUpgradeDataStream() throws Exception {
}
}

public void testMigrateDoesNotRestartOnUpgrade() throws Exception {
/*
* This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task
* does not begin running again.
*/
String dataStreamName = "reindex_test_data_stream_ugprade_test";
int numRollovers = randomIntBetween(0, 5);
boolean hasILMPolicy = randomBoolean();
boolean ilmEnabled = hasILMPolicy && randomBoolean();
if (CLUSTER_TYPE == ClusterType.OLD) {
createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled);
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
makeSureNoUpgrade(dataStreamName);
cancelReindexTask(dataStreamName);
} else {
makeSureNoUpgrade(dataStreamName);
}
}

private void cancelReindexTask(String dataStreamName) throws IOException {
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
String upgradeUser = "upgrade_user";
String upgradeUserPassword = "x-pack-test-password";
createRole("upgrade_role", dataStreamName);
createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
assertOK(cancelResponse);
}
}

private void compareIndexMetadata(
Map<String, Map<String, Object>> oldIndicesMetadata,
Map<String, Map<String, Object>> upgradedIndicesMetadata
Expand Down Expand Up @@ -422,7 +457,10 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers
"data_stream": {
}
}""";
var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_template");
var putIndexTemplateRequest = new Request(
"POST",
"/_index_template/reindex_test_data_stream_template" + randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT)
);
putIndexTemplateRequest.setJsonEntity(indexTemplate.replace("$TEMPLATE", template).replace("$PATTERN", dataStreamName));
assertOK(client().performRequest(putIndexTemplateRequest));
bulkLoadData(dataStreamName);
Expand Down Expand Up @@ -651,7 +689,7 @@ private void upgradeDataStream(
assertOK(statusResponse);
assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
final int originalWriteIndex = 1;
if (isOriginalClusterSameMajorVersionAsCurrent()) {
if (isOriginalClusterSameMajorVersionAsCurrent() || CLUSTER_TYPE == ClusterType.OLD) {
assertThat(
statusResponseString,
statusResponseMap.get("total_indices_in_data_stream"),
Expand Down Expand Up @@ -698,10 +736,35 @@ private void upgradeDataStream(
// Verify it's possible to reindex again after a successful reindex
reindexResponse = upgradeUserClient.performRequest(reindexRequest);
assertOK(reindexResponse);
}
}

Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
assertOK(cancelResponse);
private void makeSureNoUpgrade(String dataStreamName) throws Exception {
String upgradeUser = "upgrade_user";
String upgradeUserPassword = "x-pack-test-password";
createRole("upgrade_role", dataStreamName);
createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
assertBusy(() -> {
try {
Request statusRequest = new Request("GET", "_migration/reindex/" + dataStreamName + "/_status");
Response statusResponse = upgradeUserClient.performRequest(statusRequest);
Map<String, Object> statusResponseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
statusResponse.getEntity().getContent(),
false
);
String statusResponseString = statusResponseMap.keySet()
.stream()
.map(key -> key + "=" + statusResponseMap.get(key))
.collect(Collectors.joining(", ", "{", "}"));
assertOK(statusResponse);
assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(0));
} catch (Exception e) {
fail(e);
}
}, 60, TimeUnit.SECONDS);
}
}

Expand Down