Skip to content

Commit ec27e20

Browse files
authored
Removing unnecessary state from DataStreamReindexTask (#117942)
1 parent 6855a4e commit ec27e20

File tree

2 files changed

+24
-20
lines changed

2 files changed

+24
-20
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ protected ReindexDataStreamTask createTask(
5151
params.startTime(),
5252
params.totalIndices(),
5353
params.totalIndicesToBeUpgraded(),
54-
threadPool,
5554
id,
5655
type,
5756
action,
@@ -74,9 +73,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
7473
List<Index> indicesToBeReindexed = indices.stream()
7574
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
7675
.toList();
77-
reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList());
76+
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
7877
for (Index index : indicesToBeReindexed) {
78+
reindexDataStreamTask.incrementInProgressIndicesCount();
7979
// TODO This is just a placeholder. This is where the real data stream reindex logic will go
80+
reindexDataStreamTask.reindexSucceeded();
8081
}
8182

8283
completeSuccessfulPersistentTask(reindexDataStreamTask);
@@ -87,12 +88,12 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
8788
}
8889

8990
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
90-
persistentTask.reindexSucceeded();
91+
persistentTask.allReindexesCompleted();
9192
threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic());
9293
}
9394

9495
private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
95-
persistentTask.reindexFailed(e);
96+
persistentTask.taskFailed(e);
9697
threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic());
9798
}
9899

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,27 @@
1010
import org.elasticsearch.core.Tuple;
1111
import org.elasticsearch.persistent.AllocatedPersistentTask;
1212
import org.elasticsearch.tasks.TaskId;
13-
import org.elasticsearch.threadpool.ThreadPool;
1413

1514
import java.util.ArrayList;
1615
import java.util.List;
1716
import java.util.Map;
17+
import java.util.concurrent.atomic.AtomicInteger;
1818

1919
public class ReindexDataStreamTask extends AllocatedPersistentTask {
2020
public static final String TASK_NAME = "reindex-data-stream";
2121
private final long persistentTaskStartTime;
2222
private final int totalIndices;
2323
private final int totalIndicesToBeUpgraded;
24-
private final ThreadPool threadPool;
2524
private boolean complete = false;
2625
private Exception exception;
27-
private List<String> inProgress = new ArrayList<>();
28-
private List<String> pending = List.of();
26+
private AtomicInteger inProgress = new AtomicInteger(0);
27+
private AtomicInteger pending = new AtomicInteger();
2928
private List<Tuple<String, Exception>> errors = new ArrayList<>();
3029

3130
public ReindexDataStreamTask(
3231
long persistentTaskStartTime,
3332
int totalIndices,
3433
int totalIndicesToBeUpgraded,
35-
ThreadPool threadPool,
3634
long id,
3735
String type,
3836
String action,
@@ -44,7 +42,6 @@ public ReindexDataStreamTask(
4442
this.persistentTaskStartTime = persistentTaskStartTime;
4543
this.totalIndices = totalIndices;
4644
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
47-
this.threadPool = threadPool;
4845
}
4946

5047
@Override
@@ -55,30 +52,36 @@ public ReindexDataStreamStatus getStatus() {
5552
totalIndicesToBeUpgraded,
5653
complete,
5754
exception,
58-
inProgress.size(),
59-
pending.size(),
55+
inProgress.get(),
56+
pending.get(),
6057
errors
6158
);
6259
}
6360

64-
public void reindexSucceeded() {
61+
public void allReindexesCompleted() {
6562
this.complete = true;
6663
}
6764

68-
public void reindexFailed(Exception e) {
65+
public void taskFailed(Exception e) {
6966
this.complete = true;
7067
this.exception = e;
7168
}
7269

73-
public void setInProgressIndices(List<String> inProgressIndices) {
74-
this.inProgress = inProgressIndices;
70+
public void reindexSucceeded() {
71+
inProgress.decrementAndGet();
72+
}
73+
74+
public void reindexFailed(String index, Exception error) {
75+
this.errors.add(Tuple.tuple(index, error));
76+
inProgress.decrementAndGet();
7577
}
7678

77-
public void setPendingIndices(List<String> pendingIndices) {
78-
this.pending = pendingIndices;
79+
public void incrementInProgressIndicesCount() {
80+
inProgress.incrementAndGet();
81+
pending.decrementAndGet();
7982
}
8083

81-
public void addErrorIndex(String index, Exception error) {
82-
this.errors.add(Tuple.tuple(index, error));
84+
public void setPendingIndicesCount(int size) {
85+
pending.set(size);
8386
}
8487
}

0 commit comments

Comments
 (0)