Skip to content

Commit 42b7abf

Browse files
committed
Avoid restarting data stream reindex when cluster is upgraded (elastic#125587)
(cherry picked from commit 6a74aba) # Conflicts: # x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java
1 parent 67f260b commit 42b7abf

File tree

4 files changed

+130
-13
lines changed

4 files changed

+130
-13
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ protected void nodeOperation(
108108
ReindexDataStreamTaskParams params,
109109
PersistentTaskState persistentTaskState
110110
) {
111+
Long completionTime = getCompletionTime(persistentTaskState);
112+
if (completionTime != null && task instanceof ReindexDataStreamTask reindexDataStreamTask) {
113+
reindexDataStreamTask.allReindexesCompleted(threadPool, getTimeToLive(completionTime));
114+
return;
115+
}
111116
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
112117
String sourceDataStream = params.getSourceDataStream();
113118
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
@@ -317,6 +322,14 @@ private void completeFailedPersistentTask(
317322
persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e);
318323
}
319324

325+
private Long getCompletionTime(PersistentTaskState persistentTaskState) {
326+
if (persistentTaskState instanceof ReindexDataStreamPersistentTaskState state) {
327+
return state.completionTime();
328+
} else {
329+
return null;
330+
}
331+
}
332+
320333
private TimeValue updateCompletionTimeAndGetTimeToLive(
321334
ReindexDataStreamTask reindexDataStreamTask,
322335
@Nullable ReindexDataStreamPersistentTaskState state
@@ -348,6 +361,15 @@ private TimeValue updateCompletionTimeAndGetTimeToLive(
348361
completionTime = state.completionTime();
349362
}
350363
}
351-
return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime));
364+
return getTimeToLive(completionTime);
365+
}
366+
367+
private TimeValue getTimeToLive(long completionTimeInMillis) {
368+
return TimeValue.timeValueMillis(
369+
TASK_KEEP_ALIVE_TIME.millis() - Math.min(
370+
TASK_KEEP_ALIVE_TIME.millis(),
371+
threadPool.absoluteTimeInMillis() - completionTimeInMillis
372+
)
373+
);
352374
}
353375
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException {
4646
this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
4747
}
4848

49+
public boolean isComplete() {
50+
return completionTime != null;
51+
}
52+
4953
@Override
5054
public String getWriteableName() {
5155
return NAME;

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
3030
private final long persistentTaskStartTime;
3131
private final int initialTotalIndices;
3232
private final int initialTotalIndicesToBeUpgraded;
33-
private volatile boolean complete = false;
33+
private volatile boolean isCompleteLocally = false;
3434
private volatile Exception exception;
3535
private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
3636
private final AtomicInteger pending = new AtomicInteger();
@@ -73,18 +73,26 @@ public ReindexDataStreamStatus getStatus() {
7373
int totalIndices = initialTotalIndices;
7474
int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
7575
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
76+
boolean isComplete;
7677
if (persistentTask != null) {
7778
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
78-
if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
79-
totalIndices = Math.toIntExact(state.totalIndices());
80-
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
79+
if (state != null) {
80+
isComplete = state.isComplete();
81+
if (state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
82+
totalIndices = Math.toIntExact(state.totalIndices());
83+
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
84+
}
85+
} else {
86+
isComplete = false;
8187
}
88+
} else {
89+
isComplete = false;
8290
}
8391
return new ReindexDataStreamStatus(
8492
persistentTaskStartTime,
8593
totalIndices,
8694
totalIndicesToBeUpgraded,
87-
complete,
95+
isComplete,
8896
exception,
8997
inProgress,
9098
pending.get(),
@@ -93,7 +101,7 @@ public ReindexDataStreamStatus getStatus() {
93101
}
94102

95103
public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
96-
this.complete = true;
104+
isCompleteLocally = true;
97105
if (isCancelled()) {
98106
completeTask.run();
99107
} else {
@@ -120,6 +128,24 @@ public void incrementInProgressIndicesCount(String index) {
120128
pending.decrementAndGet();
121129
}
122130

131+
private boolean isCompleteInClusterState() {
132+
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
133+
.getMetadata()
134+
.getProject()
135+
.custom(PersistentTasksCustomMetadata.TYPE);
136+
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
137+
if (persistentTask != null) {
138+
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
139+
if (state != null) {
140+
return state.isComplete();
141+
} else {
142+
return false;
143+
}
144+
} else {
145+
return false;
146+
}
147+
}
148+
123149
public void setPendingIndicesCount(int size) {
124150
pending.set(size);
125151
}
@@ -130,8 +156,10 @@ public void onCancelled() {
130156
* If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
131157
* immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
132158
* allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
159+
* We check both the cluster state and isCompleteLocally -- it is possible (especially in tests) that hte cluster state
160+
* update has not happened in between when allReindexesCompleted was called and when this is called.
133161
*/
134-
if (complete) {
162+
if (isCompleteInClusterState() || isCompleteLocally) {
135163
completeTask.run();
136164
}
137165
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.nio.charset.StandardCharsets;
3535
import java.time.Instant;
3636
import java.util.List;
37+
import java.util.Locale;
3738
import java.util.Map;
3839
import java.util.Set;
3940
import java.util.concurrent.TimeUnit;
@@ -208,7 +209,9 @@ public void testUpgradeDataStream() throws Exception {
208209
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
209210
Map<String, Map<String, Object>> oldIndicesMetadata = getIndicesMetadata(dataStreamName);
210211
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
212+
cancelReindexTask(dataStreamName);
211213
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled);
214+
cancelReindexTask(dataStreamFromNonDataStreamIndices);
212215
Map<String, Map<String, Object>> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName);
213216

214217
if (ilmEnabled) {
@@ -219,6 +222,38 @@ public void testUpgradeDataStream() throws Exception {
219222
}
220223
}
221224

225+
public void testMigrateDoesNotRestartOnUpgrade() throws Exception {
226+
/*
227+
* This test makes sure that if reindex is run and completed, then when the cluster is upgraded the task
228+
* does not begin running again.
229+
*/
230+
String dataStreamName = "reindex_test_data_stream_ugprade_test";
231+
int numRollovers = randomIntBetween(0, 5);
232+
boolean hasILMPolicy = randomBoolean();
233+
boolean ilmEnabled = hasILMPolicy && randomBoolean();
234+
if (CLUSTER_TYPE == ClusterType.OLD) {
235+
createAndRolloverDataStream(dataStreamName, numRollovers, hasILMPolicy, ilmEnabled);
236+
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
237+
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
238+
makeSureNoUpgrade(dataStreamName);
239+
cancelReindexTask(dataStreamName);
240+
} else {
241+
makeSureNoUpgrade(dataStreamName);
242+
}
243+
}
244+
245+
private void cancelReindexTask(String dataStreamName) throws IOException {
246+
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
247+
String upgradeUser = "upgrade_user";
248+
String upgradeUserPassword = "x-pack-test-password";
249+
createRole("upgrade_role", dataStreamName);
250+
createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
251+
try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
252+
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
253+
assertOK(cancelResponse);
254+
}
255+
}
256+
222257
private void compareIndexMetadata(
223258
Map<String, Map<String, Object>> oldIndicesMetadata,
224259
Map<String, Map<String, Object>> upgradedIndicesMetadata
@@ -422,7 +457,10 @@ private void createAndRolloverDataStream(String dataStreamName, int numRollovers
422457
"data_stream": {
423458
}
424459
}""";
425-
var putIndexTemplateRequest = new Request("POST", "/_index_template/reindex_test_data_stream_template");
460+
var putIndexTemplateRequest = new Request(
461+
"POST",
462+
"/_index_template/reindex_test_data_stream_template" + randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT)
463+
);
426464
putIndexTemplateRequest.setJsonEntity(indexTemplate.replace("$TEMPLATE", template).replace("$PATTERN", dataStreamName));
427465
assertOK(client().performRequest(putIndexTemplateRequest));
428466
bulkLoadData(dataStreamName);
@@ -651,7 +689,7 @@ private void upgradeDataStream(
651689
assertOK(statusResponse);
652690
assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
653691
final int originalWriteIndex = 1;
654-
if (isOriginalClusterSameMajorVersionAsCurrent()) {
692+
if (isOriginalClusterSameMajorVersionAsCurrent() || CLUSTER_TYPE == ClusterType.OLD) {
655693
assertThat(
656694
statusResponseString,
657695
statusResponseMap.get("total_indices_in_data_stream"),
@@ -698,10 +736,35 @@ private void upgradeDataStream(
698736
// Verify it's possible to reindex again after a successful reindex
699737
reindexResponse = upgradeUserClient.performRequest(reindexRequest);
700738
assertOK(reindexResponse);
739+
}
740+
}
701741

702-
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
703-
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
704-
assertOK(cancelResponse);
742+
private void makeSureNoUpgrade(String dataStreamName) throws Exception {
743+
String upgradeUser = "upgrade_user";
744+
String upgradeUserPassword = "x-pack-test-password";
745+
createRole("upgrade_role", dataStreamName);
746+
createUser(upgradeUser, upgradeUserPassword, "upgrade_role");
747+
try (RestClient upgradeUserClient = getClient(upgradeUser, upgradeUserPassword)) {
748+
assertBusy(() -> {
749+
try {
750+
Request statusRequest = new Request("GET", "_migration/reindex/" + dataStreamName + "/_status");
751+
Response statusResponse = upgradeUserClient.performRequest(statusRequest);
752+
Map<String, Object> statusResponseMap = XContentHelper.convertToMap(
753+
JsonXContent.jsonXContent,
754+
statusResponse.getEntity().getContent(),
755+
false
756+
);
757+
String statusResponseString = statusResponseMap.keySet()
758+
.stream()
759+
.map(key -> key + "=" + statusResponseMap.get(key))
760+
.collect(Collectors.joining(", ", "{", "}"));
761+
assertOK(statusResponse);
762+
assertThat(statusResponseString, statusResponseMap.get("complete"), equalTo(true));
763+
assertThat(statusResponseString, statusResponseMap.get("successes"), equalTo(0));
764+
} catch (Exception e) {
765+
fail(e);
766+
}
767+
}, 60, TimeUnit.SECONDS);
705768
}
706769
}
707770

0 commit comments

Comments
 (0)