Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ protected void nodeOperation(
ReindexDataStreamTaskParams params,
PersistentTaskState persistentTaskState
) {
if (isComplete(persistentTaskState)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you have to schedule the completeTask for removal here, so the task is cleaned up after 24h?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point -- the scheduling of the removal was on the threadpool on the old node, which won't carry over to this node. But I think I can just do task.markAsCompleted(); here. The user has already upgraded, meaning they know that it completed. And the information is really out of date at this point anyway (because the data stream is "old" for this new cluster).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a node is terminated and the task moves to a new node (after completion), we wouldn't remove the task metadata otherwise.

Also note, the ScheduledCancellable returned by below line keeps running on the old node in this case, making the behavior a bit indeterministic. As far as I remember completeAndNotifyIfNeeded might throw if completed multiple times in some cases.

threadPool.schedule(completeTask, timeToLive, threadPool.generic())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would guess that the task would only be moved if the node it was on came down altogether, threadpool included, right? Regardless though, the exception will be harmless won't it?
I'll change it to schedule the removal in the future so that it continues to work once users intentionally run this on 9.x (I don't think they will want to until we're approaching 10.0).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user has already upgraded, meaning they know that it completed. And the information is really out of date at this point anyway (because the data stream is "old" for this new cluster).

Actually, rethinking this, the task might also be moved for other reasons (a node might crash any time), we can't assume the upgrade has happened just because the task was moved. Not sure how important it is to keep the old state around for 24h, but it might lead to surprising results this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this change definitely doesn't assume the task was moved just because it's on a new node -- it is explicitly checking that the completion time is non-null, meaning it was completed on some other node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only keeps the task for 24h beyond whenever it completes. If a node crashes before it completes, it will not have been scheduled for removal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is only ever a problem if a node crashes after completion within those 24h

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 looks like that's already addressed

return;
}
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
String sourceDataStream = params.getSourceDataStream();
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
Expand Down Expand Up @@ -316,6 +319,18 @@ private void completeFailedPersistentTask(
persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e);
}

private boolean isComplete(PersistentTaskState persistentTaskState) {
if (persistentTaskState == null) {
return false;
} else {
if (persistentTaskState instanceof ReindexDataStreamPersistentTaskState state) {
return state.isComplete();
} else {
return false;
}
}
}

private TimeValue updateCompletionTimeAndGetTimeToLive(
ReindexDataStreamTask reindexDataStreamTask,
@Nullable ReindexDataStreamPersistentTaskState state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException {
this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
}

public boolean isComplete() {
return completionTime != null;
}

@Override
public String getWriteableName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
private final long persistentTaskStartTime;
private final int initialTotalIndices;
private final int initialTotalIndicesToBeUpgraded;
private volatile boolean complete = false;
private volatile Exception exception;
private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
private final AtomicInteger pending = new AtomicInteger();
Expand Down Expand Up @@ -84,7 +83,7 @@ public ReindexDataStreamStatus getStatus() {
persistentTaskStartTime,
totalIndices,
totalIndicesToBeUpgraded,
complete,
isComplete(),
exception,
inProgress,
pending.get(),
Expand All @@ -93,7 +92,6 @@ public ReindexDataStreamStatus getStatus() {
}

public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
this.complete = true;
if (isCancelled()) {
completeTask.run();
} else {
Expand All @@ -120,6 +118,24 @@ public void incrementInProgressIndicesCount(String index) {
pending.decrementAndGet();
}

private boolean isComplete() {
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
if (persistentTask != null) {
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
if (state != null) {
return ((ReindexDataStreamPersistentTaskState) persistentTask.getState()).isComplete();
} else {
return false;
}
} else {
return false;
}

}

public void setPendingIndicesCount(int size) {
pending.set(size);
}
Expand All @@ -131,7 +147,7 @@ public void onCancelled() {
* immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
* allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
*/
if (complete) {
if (isComplete()) {
completeTask.run();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ public void testUpgradeDataStream() throws Exception {
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
Map<String, Map<String, Object>> oldIndicesMetadata = getIndicesMetadata(dataStreamName);
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
cancelReindexTask(dataStreamName);
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled);
cancelReindexTask(dataStreamFromNonDataStreamIndices);
Map<String, Map<String, Object>> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName);

if (ilmEnabled) {
Expand All @@ -219,6 +221,38 @@ public void testUpgradeDataStream() throws Exception {
}
}

public void testMigrateDoesNotRestartOnUpgrade() throws Exception {
/*
* This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task
* does not begin running again.
*/
String dataStreamName = "reindex_test_data_stream_ugprade_test";
int numRollovers = randomIntBetween(0, 5);
boolean hasILMPolicy = randomBoolean();
boolean ilmEnabled = hasILMPolicy && randomBoolean();
if (CLUSTER_TYPE == ClusterType.OLD) {
createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled);
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
makeSureNoUpgrade(dataStreamName);
cancelReindexTask(dataStreamName);
} else {
makeSureNoUpgrade(dataStreamName);
}
}

private void cancelReindexTask(String dataStreamName) throws IOException {
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
String upgradeUser = "upgrade_user";
String upgradeUserPassword = "x-pack-test-password";
createRole("upgrade_role", dataStreamName);
createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
assertOK(cancelResponse);
}
}

private void compareIndexMetadata(
Map<String, Map<String, Object>> oldIndicesMetadata,
Map<String, Map<String, Object>> upgradedIndicesMetadata
Expand Down Expand Up @@ -651,7 +685,7 @@ private void upgradeDataStream(
assertOK(statusResponse);
assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
final int originalWriteIndex = 1;
if (isOriginalClusterSameMajorVersionAsCurrent()) {
if (isOriginalClusterSameMajorVersionAsCurrent() || CLUSTER_TYPE == ClusterType.OLD) {
assertThat(
statusResponseString,
statusResponseMap.get("total_indices_in_data_stream"),
Expand Down Expand Up @@ -698,10 +732,35 @@ private void upgradeDataStream(
// Verify it's possible to reindex again after a successful reindex
reindexResponse = upgradeUserClient.performRequest(reindexRequest);
assertOK(reindexResponse);
}
}

Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
assertOK(cancelResponse);
private void makeSureNoUpgrade(String dataStreamName) throws Exception {
String upgradeUser = "upgrade_user";
String upgradeUserPassword = "x-pack-test-password";
createRole("upgrade_role", dataStreamName);
createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
assertBusy(() -> {
try {
Request statusRequest = new Request("GET", "_migration/reindex/" + dataStreamName + "/_status");
Response statusResponse = upgradeUserClient.performRequest(statusRequest);
Map<String, Object> statusResponseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
statusResponse.getEntity().getContent(),
false
);
String statusResponseString = statusResponseMap.keySet()
.stream()
.map(key -> key + "=" + statusResponseMap.get(key))
.collect(Collectors.joining(", ", "{", "}"));
assertOK(statusResponse);
assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(0));
} catch (Exception e) {
fail(e);
}
}, 60, TimeUnit.SECONDS);
}
}

Expand Down