Skip to content

Commit 5c72bce

Browse files
authored
Correcting index counts in data stream reindex status (#119658) (#119790)
1 parent a9dac43 commit 5c72bce

File tree

5 files changed

+173
-39
lines changed

5 files changed

+173
-39
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.metadata.DataStream;
2020
import org.elasticsearch.cluster.metadata.DataStreamAction;
2121
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.core.Nullable;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.index.Index;
2425
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -60,6 +61,7 @@ protected ReindexDataStreamTask createTask(
6061
) {
6162
ReindexDataStreamTaskParams params = taskInProgress.getParams();
6263
return new ReindexDataStreamTask(
64+
clusterService,
6365
params.startTime(),
6466
params.totalIndices(),
6567
params.totalIndicesToBeUpgraded(),
@@ -73,7 +75,12 @@ protected ReindexDataStreamTask createTask(
7375
}
7476

7577
@Override
76-
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
78+
protected void nodeOperation(
79+
AllocatedPersistentTask task,
80+
ReindexDataStreamTaskParams params,
81+
PersistentTaskState persistentTaskState
82+
) {
83+
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
7784
String sourceDataStream = params.getSourceDataStream();
7885
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
7986
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
@@ -92,33 +99,71 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
9299
RolloverAction.INSTANCE,
93100
rolloverRequest,
94101
ActionListener.wrap(
95-
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId),
96-
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
102+
rolloverResponse -> reindexIndices(
103+
dataStream,
104+
dataStream.getIndices().size() + 1,
105+
reindexDataStreamTask,
106+
params,
107+
state,
108+
reindexClient,
109+
sourceDataStream,
110+
taskId
111+
),
112+
e -> completeFailedPersistentTask(reindexDataStreamTask, state, e)
97113
)
98114
);
99115
} else {
100-
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId);
116+
reindexIndices(
117+
dataStream,
118+
dataStream.getIndices().size(),
119+
reindexDataStreamTask,
120+
params,
121+
state,
122+
reindexClient,
123+
sourceDataStream,
124+
taskId
125+
);
101126
}
102127
} else {
103-
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
128+
completeFailedPersistentTask(reindexDataStreamTask, state, new ElasticsearchException("data stream does not exist"));
104129
}
105-
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception)));
130+
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, state, exception)));
106131
}
107132

108133
private void reindexIndices(
109134
DataStream dataStream,
135+
int totalIndicesInDataStream,
110136
ReindexDataStreamTask reindexDataStreamTask,
137+
ReindexDataStreamTaskParams params,
138+
ReindexDataStreamPersistentTaskState state,
111139
ExecuteWithHeadersClient reindexClient,
112140
String sourceDataStream,
113141
TaskId parentTaskId
114142
) {
115143
List<Index> indices = dataStream.getIndices();
116144
List<Index> indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList();
145+
final ReindexDataStreamPersistentTaskState updatedState;
146+
if (params.totalIndices() != totalIndicesInDataStream
147+
|| params.totalIndicesToBeUpgraded() != indicesToBeReindexed.size()
148+
|| (state != null
149+
&& (state.totalIndices() != null
150+
&& state.totalIndicesToBeUpgraded() != null
151+
&& (state.totalIndices() != totalIndicesInDataStream
152+
|| state.totalIndicesToBeUpgraded() != indicesToBeReindexed.size())))) {
153+
updatedState = new ReindexDataStreamPersistentTaskState(
154+
totalIndicesInDataStream,
155+
indicesToBeReindexed.size(),
156+
state == null ? null : state.completionTime()
157+
);
158+
reindexDataStreamTask.updatePersistentTaskState(updatedState, ActionListener.noop());
159+
} else {
160+
updatedState = state;
161+
}
117162
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
118163
// The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices
119164
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
120-
completeSuccessfulPersistentTask(reindexDataStreamTask);
121-
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }));
165+
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
166+
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
122167
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
123168
final int maxConcurrentIndices = 1;
124169
for (int i = 0; i < maxConcurrentIndices; i++) {
@@ -190,15 +235,25 @@ public void onFailure(Exception e) {
190235
});
191236
}
192237

193-
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
194-
persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask));
238+
private void completeSuccessfulPersistentTask(
239+
ReindexDataStreamTask persistentTask,
240+
@Nullable ReindexDataStreamPersistentTaskState state
241+
) {
242+
persistentTask.allReindexesCompleted(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state));
195243
}
196244

197-
private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
198-
persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e);
245+
private void completeFailedPersistentTask(
246+
ReindexDataStreamTask persistentTask,
247+
@Nullable ReindexDataStreamPersistentTaskState state,
248+
Exception e
249+
) {
250+
persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e);
199251
}
200252

201-
private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
253+
private TimeValue updateCompletionTimeAndGetTimeToLive(
254+
ReindexDataStreamTask reindexDataStreamTask,
255+
@Nullable ReindexDataStreamPersistentTaskState state
256+
) {
202257
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
203258
.getMetadata()
204259
.custom(PersistentTasksCustomMetadata.TYPE);
@@ -208,16 +263,23 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
208263
if (persistentTask == null) {
209264
return TimeValue.timeValueMillis(0);
210265
}
211-
PersistentTaskState state = persistentTask.getState();
212266
final long completionTime;
213267
if (state == null) {
214268
completionTime = threadPool.absoluteTimeInMillis();
215269
reindexDataStreamTask.updatePersistentTaskState(
216-
new ReindexDataStreamPersistentTaskState(completionTime),
270+
new ReindexDataStreamPersistentTaskState(null, null, completionTime),
217271
ActionListener.noop()
218272
);
219273
} else {
220-
completionTime = ((ReindexDataStreamPersistentTaskState) state).completionTime();
274+
if (state.completionTime() == null) {
275+
completionTime = threadPool.absoluteTimeInMillis();
276+
reindexDataStreamTask.updatePersistentTaskState(
277+
new ReindexDataStreamPersistentTaskState(state.totalIndices(), state.totalIndicesToBeUpgraded(), completionTime),
278+
ActionListener.noop()
279+
);
280+
} else {
281+
completionTime = state.completionTime();
282+
}
221283
}
222284
return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime));
223285
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.core.Nullable;
1213
import org.elasticsearch.persistent.PersistentTaskState;
1314
import org.elasticsearch.tasks.Task;
1415
import org.elasticsearch.xcontent.ConstructingObjectParser;
@@ -18,22 +19,31 @@
1819

1920
import java.io.IOException;
2021

21-
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
22+
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
23+
24+
public record ReindexDataStreamPersistentTaskState(
25+
@Nullable Integer totalIndices,
26+
@Nullable Integer totalIndicesToBeUpgraded,
27+
@Nullable Long completionTime
28+
) implements Task.Status, PersistentTaskState {
2229

23-
public record ReindexDataStreamPersistentTaskState(long completionTime) implements Task.Status, PersistentTaskState {
2430
public static final String NAME = ReindexDataStreamTask.TASK_NAME;
31+
private static final String TOTAL_INDICES_FIELD = "total_indices_in_data_stream";
32+
private static final String TOTAL_INDICES_REQUIRING_UPGRADE_FIELD = "total_indices_requiring_upgrade";
2533
private static final String COMPLETION_TIME_FIELD = "completion_time";
2634
private static final ConstructingObjectParser<ReindexDataStreamPersistentTaskState, Void> PARSER = new ConstructingObjectParser<>(
2735
NAME,
2836
true,
29-
args -> new ReindexDataStreamPersistentTaskState((long) args[0])
37+
args -> new ReindexDataStreamPersistentTaskState((Integer) args[0], (Integer) args[1], (Long) args[2])
3038
);
3139
static {
32-
PARSER.declareLong(constructorArg(), new ParseField(COMPLETION_TIME_FIELD));
40+
PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_FIELD));
41+
PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD));
42+
PARSER.declareLong(optionalConstructorArg(), new ParseField(COMPLETION_TIME_FIELD));
3343
}
3444

3545
public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException {
36-
this(in.readLong());
46+
this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
3747
}
3848

3949
@Override
@@ -43,13 +53,23 @@ public String getWriteableName() {
4353

4454
@Override
4555
public void writeTo(StreamOutput out) throws IOException {
46-
out.writeLong(completionTime);
56+
out.writeOptionalInt(totalIndices);
57+
out.writeOptionalInt(totalIndicesToBeUpgraded);
58+
out.writeOptionalLong(completionTime);
4759
}
4860

4961
@Override
5062
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5163
builder.startObject();
52-
builder.field(COMPLETION_TIME_FIELD, completionTime);
64+
if (totalIndices != null) {
65+
builder.field(TOTAL_INDICES_FIELD, totalIndices);
66+
}
67+
if (totalIndicesToBeUpgraded != null) {
68+
builder.field(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD, totalIndicesToBeUpgraded);
69+
}
70+
if (completionTime != null) {
71+
builder.field(COMPLETION_TIME_FIELD, completionTime);
72+
}
5373
builder.endObject();
5474
return builder;
5575
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.elasticsearch.cluster.service.ClusterService;
1011
import org.elasticsearch.common.util.concurrent.RunOnce;
1112
import org.elasticsearch.core.TimeValue;
1213
import org.elasticsearch.core.Tuple;
1314
import org.elasticsearch.persistent.AllocatedPersistentTask;
15+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
1416
import org.elasticsearch.tasks.TaskId;
1517
import org.elasticsearch.threadpool.ThreadPool;
1618

@@ -24,9 +26,10 @@
2426

2527
public class ReindexDataStreamTask extends AllocatedPersistentTask {
2628
public static final String TASK_NAME = "reindex-data-stream";
29+
private final ClusterService clusterService;
2730
private final long persistentTaskStartTime;
28-
private final int totalIndices;
29-
private final int totalIndicesToBeUpgraded;
31+
private final int initialTotalIndices;
32+
private final int initialTotalIndicesToBeUpgraded;
3033
private volatile boolean complete = false;
3134
private volatile Exception exception;
3235
private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
@@ -36,9 +39,10 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
3639

3740
@SuppressWarnings("this-escape")
3841
public ReindexDataStreamTask(
42+
ClusterService clusterService,
3943
long persistentTaskStartTime,
40-
int totalIndices,
41-
int totalIndicesToBeUpgraded,
44+
int initialTotalIndices,
45+
int initialTotalIndicesToBeUpgraded,
4246
long id,
4347
String type,
4448
String action,
@@ -47,9 +51,10 @@ public ReindexDataStreamTask(
4751
Map<String, String> headers
4852
) {
4953
super(id, type, action, description, parentTask, headers);
54+
this.clusterService = clusterService;
5055
this.persistentTaskStartTime = persistentTaskStartTime;
51-
this.totalIndices = totalIndices;
52-
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
56+
this.initialTotalIndices = initialTotalIndices;
57+
this.initialTotalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
5358
this.completeTask = new RunOnce(() -> {
5459
if (exception == null) {
5560
markAsCompleted();
@@ -61,6 +66,19 @@ public ReindexDataStreamTask(
6166

6267
@Override
6368
public ReindexDataStreamStatus getStatus() {
69+
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
70+
.getMetadata()
71+
.custom(PersistentTasksCustomMetadata.TYPE);
72+
int totalIndices = initialTotalIndices;
73+
int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
74+
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
75+
if (persistentTask != null) {
76+
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
77+
if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
78+
totalIndices = Math.toIntExact(state.totalIndices());
79+
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
80+
}
81+
}
6482
return new ReindexDataStreamStatus(
6583
persistentTaskStartTime,
6684
totalIndices,

x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,32 @@ protected Writeable.Reader<ReindexDataStreamPersistentTaskState> instanceReader(
2626

2727
@Override
2828
protected ReindexDataStreamPersistentTaskState createTestInstance() {
29-
return new ReindexDataStreamPersistentTaskState(randomNonNegativeLong());
29+
return new ReindexDataStreamPersistentTaskState(
30+
randomBoolean() ? null : randomNonNegativeInt(),
31+
randomBoolean() ? null : randomNonNegativeInt(),
32+
randomBoolean() ? null : randomNonNegativeLong()
33+
);
3034
}
3135

3236
@Override
3337
protected ReindexDataStreamPersistentTaskState mutateInstance(ReindexDataStreamPersistentTaskState instance) throws IOException {
34-
return new ReindexDataStreamPersistentTaskState(instance.completionTime() + 1);
38+
return switch (randomInt(2)) {
39+
case 0 -> new ReindexDataStreamPersistentTaskState(
40+
instance.totalIndices() == null ? randomNonNegativeInt() : instance.totalIndices() + 1,
41+
instance.totalIndicesToBeUpgraded(),
42+
instance.completionTime()
43+
);
44+
case 1 -> new ReindexDataStreamPersistentTaskState(
45+
instance.totalIndices(),
46+
instance.totalIndicesToBeUpgraded() == null ? randomNonNegativeInt() : instance.totalIndicesToBeUpgraded() + 1,
47+
instance.completionTime()
48+
);
49+
case 2 -> new ReindexDataStreamPersistentTaskState(
50+
instance.totalIndices(),
51+
instance.totalIndicesToBeUpgraded(),
52+
instance.completionTime() == null ? randomNonNegativeLong() : instance.completionTime() + 1
53+
);
54+
default -> throw new IllegalArgumentException("Should never get here");
55+
};
3556
}
3657
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,11 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
250250
}
251251
}
252252

253-
private void upgradeDataStream(String dataStreamName, int numRollovers) throws Exception {
253+
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
254+
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
255+
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
256+
rollover(dataStreamName);
257+
}
254258
Request reindexRequest = new Request("POST", "/_migration/reindex");
255259
reindexRequest.setJsonEntity(Strings.format("""
256260
{
@@ -271,18 +275,27 @@ private void upgradeDataStream(String dataStreamName, int numRollovers) throws E
271275
);
272276
assertOK(statusResponse);
273277
assertThat(statusResponseMap.get("complete"), equalTo(true));
274-
/*
275-
* total_indices_in_data_stream is determined at the beginning of the reindex, and does not take into account the write
276-
* index being rolled over
277-
*/
278-
assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(numRollovers + 1));
278+
// The number of rollovers that will have happened when we call reindex:
279+
final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0;
280+
final int originalWriteIndex = 1;
281+
assertThat(
282+
statusResponseMap.get("total_indices_in_data_stream"),
283+
equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex)
284+
);
279285
if (isOriginalClusterSameMajorVersionAsCurrent()) {
280286
// If the original cluster was the same as this one, we don't want any indices reindexed:
281287
assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(0));
282288
assertThat(statusResponseMap.get("successes"), equalTo(0));
283289
} else {
284-
assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(numRollovers + 1));
285-
assertThat(statusResponseMap.get("successes"), equalTo(numRollovers + 1));
290+
/*
291+
* total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of
292+
* rollovers on the upgraded cluster is irrelevant since those will not be reindexed.
293+
*/
294+
assertThat(
295+
statusResponseMap.get("total_indices_requiring_upgrade"),
296+
equalTo(originalWriteIndex + numRolloversOnOldCluster)
297+
);
298+
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
286299
}
287300
}, 60, TimeUnit.SECONDS);
288301
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");

0 commit comments

Comments
 (0)