Skip to content

Commit 5239ce7

Browse files
authored
Correcting index counts in data stream reindex status (elastic#119658)
1 parent 8d93bd9 commit 5239ce7

File tree

5 files changed

+173
-39
lines changed

5 files changed

+173
-39
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.metadata.DataStream;
2020
import org.elasticsearch.cluster.metadata.DataStreamAction;
2121
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.core.Nullable;
2223
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.index.Index;
2425
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -61,6 +62,7 @@ protected ReindexDataStreamTask createTask(
6162
) {
6263
ReindexDataStreamTaskParams params = taskInProgress.getParams();
6364
return new ReindexDataStreamTask(
65+
clusterService,
6466
params.startTime(),
6567
params.totalIndices(),
6668
params.totalIndicesToBeUpgraded(),
@@ -74,7 +76,12 @@ protected ReindexDataStreamTask createTask(
7476
}
7577

7678
@Override
77-
protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTaskParams params, PersistentTaskState state) {
79+
protected void nodeOperation(
80+
AllocatedPersistentTask task,
81+
ReindexDataStreamTaskParams params,
82+
PersistentTaskState persistentTaskState
83+
) {
84+
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTaskState;
7885
String sourceDataStream = params.getSourceDataStream();
7986
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
8087
GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
@@ -93,33 +100,71 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
93100
RolloverAction.INSTANCE,
94101
rolloverRequest,
95102
ActionListener.wrap(
96-
rolloverResponse -> reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId),
97-
e -> completeFailedPersistentTask(reindexDataStreamTask, e)
103+
rolloverResponse -> reindexIndices(
104+
dataStream,
105+
dataStream.getIndices().size() + 1,
106+
reindexDataStreamTask,
107+
params,
108+
state,
109+
reindexClient,
110+
sourceDataStream,
111+
taskId
112+
),
113+
e -> completeFailedPersistentTask(reindexDataStreamTask, state, e)
98114
)
99115
);
100116
} else {
101-
reindexIndices(dataStream, reindexDataStreamTask, reindexClient, sourceDataStream, taskId);
117+
reindexIndices(
118+
dataStream,
119+
dataStream.getIndices().size(),
120+
reindexDataStreamTask,
121+
params,
122+
state,
123+
reindexClient,
124+
sourceDataStream,
125+
taskId
126+
);
102127
}
103128
} else {
104-
completeFailedPersistentTask(reindexDataStreamTask, new ElasticsearchException("data stream does not exist"));
129+
completeFailedPersistentTask(reindexDataStreamTask, state, new ElasticsearchException("data stream does not exist"));
105130
}
106-
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, exception)));
131+
}, exception -> completeFailedPersistentTask(reindexDataStreamTask, state, exception)));
107132
}
108133

109134
private void reindexIndices(
110135
DataStream dataStream,
136+
int totalIndicesInDataStream,
111137
ReindexDataStreamTask reindexDataStreamTask,
138+
ReindexDataStreamTaskParams params,
139+
ReindexDataStreamPersistentTaskState state,
112140
ExecuteWithHeadersClient reindexClient,
113141
String sourceDataStream,
114142
TaskId parentTaskId
115143
) {
116144
List<Index> indices = dataStream.getIndices();
117145
List<Index> indicesToBeReindexed = indices.stream().filter(getReindexRequiredPredicate(clusterService.state().metadata())).toList();
146+
final ReindexDataStreamPersistentTaskState updatedState;
147+
if (params.totalIndices() != totalIndicesInDataStream
148+
|| params.totalIndicesToBeUpgraded() != indicesToBeReindexed.size()
149+
|| (state != null
150+
&& (state.totalIndices() != null
151+
&& state.totalIndicesToBeUpgraded() != null
152+
&& (state.totalIndices() != totalIndicesInDataStream
153+
|| state.totalIndicesToBeUpgraded() != indicesToBeReindexed.size())))) {
154+
updatedState = new ReindexDataStreamPersistentTaskState(
155+
totalIndicesInDataStream,
156+
indicesToBeReindexed.size(),
157+
state == null ? null : state.completionTime()
158+
);
159+
reindexDataStreamTask.updatePersistentTaskState(updatedState, ActionListener.noop());
160+
} else {
161+
updatedState = state;
162+
}
118163
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
119164
// The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices
120165
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
121-
completeSuccessfulPersistentTask(reindexDataStreamTask);
122-
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, exception); }));
166+
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
167+
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
123168
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
124169
final int maxConcurrentIndices = 1;
125170
for (int i = 0; i < maxConcurrentIndices; i++) {
@@ -191,15 +236,25 @@ public void onFailure(Exception e) {
191236
});
192237
}
193238

194-
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
195-
persistentTask.allReindexesCompleted(threadPool, getTimeToLive(persistentTask));
239+
private void completeSuccessfulPersistentTask(
240+
ReindexDataStreamTask persistentTask,
241+
@Nullable ReindexDataStreamPersistentTaskState state
242+
) {
243+
persistentTask.allReindexesCompleted(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state));
196244
}
197245

198-
private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
199-
persistentTask.taskFailed(threadPool, getTimeToLive(persistentTask), e);
246+
private void completeFailedPersistentTask(
247+
ReindexDataStreamTask persistentTask,
248+
@Nullable ReindexDataStreamPersistentTaskState state,
249+
Exception e
250+
) {
251+
persistentTask.taskFailed(threadPool, updateCompletionTimeAndGetTimeToLive(persistentTask, state), e);
200252
}
201253

202-
private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
254+
private TimeValue updateCompletionTimeAndGetTimeToLive(
255+
ReindexDataStreamTask reindexDataStreamTask,
256+
@Nullable ReindexDataStreamPersistentTaskState state
257+
) {
203258
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
204259
.getMetadata()
205260
.custom(PersistentTasksCustomMetadata.TYPE);
@@ -209,16 +264,23 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
209264
if (persistentTask == null) {
210265
return TimeValue.timeValueMillis(0);
211266
}
212-
PersistentTaskState state = persistentTask.getState();
213267
final long completionTime;
214268
if (state == null) {
215269
completionTime = threadPool.absoluteTimeInMillis();
216270
reindexDataStreamTask.updatePersistentTaskState(
217-
new ReindexDataStreamPersistentTaskState(completionTime),
271+
new ReindexDataStreamPersistentTaskState(null, null, completionTime),
218272
ActionListener.noop()
219273
);
220274
} else {
221-
completionTime = ((ReindexDataStreamPersistentTaskState) state).completionTime();
275+
if (state.completionTime() == null) {
276+
completionTime = threadPool.absoluteTimeInMillis();
277+
reindexDataStreamTask.updatePersistentTaskState(
278+
new ReindexDataStreamPersistentTaskState(state.totalIndices(), state.totalIndicesToBeUpgraded(), completionTime),
279+
ActionListener.noop()
280+
);
281+
} else {
282+
completionTime = state.completionTime();
283+
}
222284
}
223285
return TimeValue.timeValueMillis(TASK_KEEP_ALIVE_TIME.millis() - (threadPool.absoluteTimeInMillis() - completionTime));
224286
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskState.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.core.Nullable;
1213
import org.elasticsearch.persistent.PersistentTaskState;
1314
import org.elasticsearch.tasks.Task;
1415
import org.elasticsearch.xcontent.ConstructingObjectParser;
@@ -18,22 +19,31 @@
1819

1920
import java.io.IOException;
2021

21-
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
22+
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
23+
24+
public record ReindexDataStreamPersistentTaskState(
25+
@Nullable Integer totalIndices,
26+
@Nullable Integer totalIndicesToBeUpgraded,
27+
@Nullable Long completionTime
28+
) implements Task.Status, PersistentTaskState {
2229

23-
public record ReindexDataStreamPersistentTaskState(long completionTime) implements Task.Status, PersistentTaskState {
2430
public static final String NAME = ReindexDataStreamTask.TASK_NAME;
31+
private static final String TOTAL_INDICES_FIELD = "total_indices_in_data_stream";
32+
private static final String TOTAL_INDICES_REQUIRING_UPGRADE_FIELD = "total_indices_requiring_upgrade";
2533
private static final String COMPLETION_TIME_FIELD = "completion_time";
2634
private static final ConstructingObjectParser<ReindexDataStreamPersistentTaskState, Void> PARSER = new ConstructingObjectParser<>(
2735
NAME,
2836
true,
29-
args -> new ReindexDataStreamPersistentTaskState((long) args[0])
37+
args -> new ReindexDataStreamPersistentTaskState((Integer) args[0], (Integer) args[1], (Long) args[2])
3038
);
3139
static {
32-
PARSER.declareLong(constructorArg(), new ParseField(COMPLETION_TIME_FIELD));
40+
PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_FIELD));
41+
PARSER.declareInt(optionalConstructorArg(), new ParseField(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD));
42+
PARSER.declareLong(optionalConstructorArg(), new ParseField(COMPLETION_TIME_FIELD));
3343
}
3444

3545
public ReindexDataStreamPersistentTaskState(StreamInput in) throws IOException {
36-
this(in.readLong());
46+
this(in.readOptionalInt(), in.readOptionalInt(), in.readOptionalLong());
3747
}
3848

3949
@Override
@@ -43,13 +53,23 @@ public String getWriteableName() {
4353

4454
@Override
4555
public void writeTo(StreamOutput out) throws IOException {
46-
out.writeLong(completionTime);
56+
out.writeOptionalInt(totalIndices);
57+
out.writeOptionalInt(totalIndicesToBeUpgraded);
58+
out.writeOptionalLong(completionTime);
4759
}
4860

4961
@Override
5062
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5163
builder.startObject();
52-
builder.field(COMPLETION_TIME_FIELD, completionTime);
64+
if (totalIndices != null) {
65+
builder.field(TOTAL_INDICES_FIELD, totalIndices);
66+
}
67+
if (totalIndicesToBeUpgraded != null) {
68+
builder.field(TOTAL_INDICES_REQUIRING_UPGRADE_FIELD, totalIndicesToBeUpgraded);
69+
}
70+
if (completionTime != null) {
71+
builder.field(COMPLETION_TIME_FIELD, completionTime);
72+
}
5373
builder.endObject();
5474
return builder;
5575
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.elasticsearch.cluster.service.ClusterService;
1011
import org.elasticsearch.common.util.concurrent.RunOnce;
1112
import org.elasticsearch.core.TimeValue;
1213
import org.elasticsearch.core.Tuple;
1314
import org.elasticsearch.persistent.AllocatedPersistentTask;
15+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
1416
import org.elasticsearch.tasks.TaskId;
1517
import org.elasticsearch.threadpool.ThreadPool;
1618

@@ -24,9 +26,10 @@
2426

2527
public class ReindexDataStreamTask extends AllocatedPersistentTask {
2628
public static final String TASK_NAME = "reindex-data-stream";
29+
private final ClusterService clusterService;
2730
private final long persistentTaskStartTime;
28-
private final int totalIndices;
29-
private final int totalIndicesToBeUpgraded;
31+
private final int initialTotalIndices;
32+
private final int initialTotalIndicesToBeUpgraded;
3033
private volatile boolean complete = false;
3134
private volatile Exception exception;
3235
private final Set<String> inProgress = Collections.synchronizedSet(new HashSet<>());
@@ -36,9 +39,10 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
3639

3740
@SuppressWarnings("this-escape")
3841
public ReindexDataStreamTask(
42+
ClusterService clusterService,
3943
long persistentTaskStartTime,
40-
int totalIndices,
41-
int totalIndicesToBeUpgraded,
44+
int initialTotalIndices,
45+
int initialTotalIndicesToBeUpgraded,
4246
long id,
4347
String type,
4448
String action,
@@ -47,9 +51,10 @@ public ReindexDataStreamTask(
4751
Map<String, String> headers
4852
) {
4953
super(id, type, action, description, parentTask, headers);
54+
this.clusterService = clusterService;
5055
this.persistentTaskStartTime = persistentTaskStartTime;
51-
this.totalIndices = totalIndices;
52-
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
56+
this.initialTotalIndices = initialTotalIndices;
57+
this.initialTotalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
5358
this.completeTask = new RunOnce(() -> {
5459
if (exception == null) {
5560
markAsCompleted();
@@ -61,6 +66,19 @@ public ReindexDataStreamTask(
6166

6267
@Override
6368
public ReindexDataStreamStatus getStatus() {
69+
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
70+
.getMetadata()
71+
.custom(PersistentTasksCustomMetadata.TYPE);
72+
int totalIndices = initialTotalIndices;
73+
int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
74+
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(getPersistentTaskId());
75+
if (persistentTask != null) {
76+
ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState) persistentTask.getState();
77+
if (state != null && state.totalIndices() != null && state.totalIndicesToBeUpgraded() != null) {
78+
totalIndices = Math.toIntExact(state.totalIndices());
79+
totalIndicesToBeUpgraded = Math.toIntExact(state.totalIndicesToBeUpgraded());
80+
}
81+
}
6482
return new ReindexDataStreamStatus(
6583
persistentTaskStartTime,
6684
totalIndices,

x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskStateTests.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,32 @@ protected Writeable.Reader<ReindexDataStreamPersistentTaskState> instanceReader(
2626

2727
@Override
2828
protected ReindexDataStreamPersistentTaskState createTestInstance() {
29-
return new ReindexDataStreamPersistentTaskState(randomNegativeLong());
29+
return new ReindexDataStreamPersistentTaskState(
30+
randomBoolean() ? null : randomNonNegativeInt(),
31+
randomBoolean() ? null : randomNonNegativeInt(),
32+
randomBoolean() ? null : randomNonNegativeLong()
33+
);
3034
}
3135

3236
@Override
3337
protected ReindexDataStreamPersistentTaskState mutateInstance(ReindexDataStreamPersistentTaskState instance) throws IOException {
34-
return new ReindexDataStreamPersistentTaskState(instance.completionTime() + 1);
38+
return switch (randomInt(2)) {
39+
case 0 -> new ReindexDataStreamPersistentTaskState(
40+
instance.totalIndices() == null ? randomNonNegativeInt() : instance.totalIndices() + 1,
41+
instance.totalIndicesToBeUpgraded(),
42+
instance.completionTime()
43+
);
44+
case 1 -> new ReindexDataStreamPersistentTaskState(
45+
instance.totalIndices(),
46+
instance.totalIndicesToBeUpgraded() == null ? randomNonNegativeInt() : instance.totalIndicesToBeUpgraded() + 1,
47+
instance.completionTime()
48+
);
49+
case 2 -> new ReindexDataStreamPersistentTaskState(
50+
instance.totalIndices(),
51+
instance.totalIndicesToBeUpgraded(),
52+
instance.completionTime() == null ? randomNonNegativeLong() : instance.completionTime() + 1
53+
);
54+
default -> throw new IllegalArgumentException("Should never get here");
55+
};
3556
}
3657
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,11 @@ private static void createAndRolloverDataStream(String dataStreamName, int numRo
255255
}
256256
}
257257

258-
private void upgradeDataStream(String dataStreamName, int numRollovers) throws Exception {
258+
private void upgradeDataStream(String dataStreamName, int numRolloversOnOldCluster) throws Exception {
259+
final int explicitRolloverOnNewClusterCount = randomIntBetween(0, 2);
260+
for (int i = 0; i < explicitRolloverOnNewClusterCount; i++) {
261+
rollover(dataStreamName);
262+
}
259263
Request reindexRequest = new Request("POST", "/_migration/reindex");
260264
reindexRequest.setJsonEntity(Strings.format("""
261265
{
@@ -276,18 +280,27 @@ private void upgradeDataStream(String dataStreamName, int numRollovers) throws E
276280
);
277281
assertOK(statusResponse);
278282
assertThat(statusResponseMap.get("complete"), equalTo(true));
279-
/*
280-
* total_indices_in_data_stream is determined at the beginning of the reindex, and does not take into account the write
281-
* index being rolled over
282-
*/
283-
assertThat(statusResponseMap.get("total_indices_in_data_stream"), equalTo(numRollovers + 1));
283+
// The number of rollovers that will have happened when we call reindex:
284+
final int rolloversPerformedByReindex = explicitRolloverOnNewClusterCount == 0 ? 1 : 0;
285+
final int originalWriteIndex = 1;
286+
assertThat(
287+
statusResponseMap.get("total_indices_in_data_stream"),
288+
equalTo(originalWriteIndex + numRolloversOnOldCluster + explicitRolloverOnNewClusterCount + rolloversPerformedByReindex)
289+
);
284290
if (isOriginalClusterSameMajorVersionAsCurrent()) {
285291
// If the original cluster was the same as this one, we don't want any indices reindexed:
286292
assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(0));
287293
assertThat(statusResponseMap.get("successes"), equalTo(0));
288294
} else {
289-
assertThat(statusResponseMap.get("total_indices_requiring_upgrade"), equalTo(numRollovers + 1));
290-
assertThat(statusResponseMap.get("successes"), equalTo(numRollovers + 1));
295+
/*
296+
* total_indices_requiring_upgrade is made up of: (the original write index) + numRolloversOnOldCluster. The number of
297+
* rollovers on the upgraded cluster is irrelevant since those will not be reindexed.
298+
*/
299+
assertThat(
300+
statusResponseMap.get("total_indices_requiring_upgrade"),
301+
equalTo(originalWriteIndex + numRolloversOnOldCluster)
302+
);
303+
assertThat(statusResponseMap.get("successes"), equalTo(numRolloversOnOldCluster + 1));
291304
}
292305
}, 60, TimeUnit.SECONDS);
293306
Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");

0 commit comments

Comments
 (0)