Skip to content

Commit 0a53248

Browse files
committed
Merge branch '8.x' of github.com:elastic/elasticsearch into 8.x
2 parents 285ab01 + 232bfe1 commit 0a53248

File tree

2 files changed

+24
-20
lines changed

2 files changed

+24
-20
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ protected ReindexDataStreamTask createTask(
5353
params.startTime(),
5454
params.totalIndices(),
5555
params.totalIndicesToBeUpgraded(),
56-
threadPool,
5756
id,
5857
type,
5958
action,
@@ -76,9 +75,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
7675
List<Index> indicesToBeReindexed = indices.stream()
7776
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
7877
.toList();
79-
reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList());
78+
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
8079
for (Index index : indicesToBeReindexed) {
80+
reindexDataStreamTask.incrementInProgressIndicesCount();
8181
// TODO This is just a placeholder. This is where the real data stream reindex logic will go
82+
reindexDataStreamTask.reindexSucceeded();
8283
}
8384

8485
completeSuccessfulPersistentTask(reindexDataStreamTask);
@@ -89,12 +90,12 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
8990
}
9091

9192
private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
92-
persistentTask.reindexSucceeded();
93+
persistentTask.allReindexesCompleted();
9394
threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic());
9495
}
9596

9697
private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
97-
persistentTask.reindexFailed(e);
98+
persistentTask.taskFailed(e);
9899
threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic());
99100
}
100101

modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,27 @@
1212
import org.elasticsearch.core.Tuple;
1313
import org.elasticsearch.persistent.AllocatedPersistentTask;
1414
import org.elasticsearch.tasks.TaskId;
15-
import org.elasticsearch.threadpool.ThreadPool;
1615

1716
import java.util.ArrayList;
1817
import java.util.List;
1918
import java.util.Map;
19+
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
public class ReindexDataStreamTask extends AllocatedPersistentTask {
2222
public static final String TASK_NAME = "reindex-data-stream";
2323
private final long persistentTaskStartTime;
2424
private final int totalIndices;
2525
private final int totalIndicesToBeUpgraded;
26-
private final ThreadPool threadPool;
2726
private boolean complete = false;
2827
private Exception exception;
29-
private List<String> inProgress = new ArrayList<>();
30-
private List<String> pending = List.of();
28+
private AtomicInteger inProgress = new AtomicInteger(0);
29+
private AtomicInteger pending = new AtomicInteger();
3130
private List<Tuple<String, Exception>> errors = new ArrayList<>();
3231

3332
public ReindexDataStreamTask(
3433
long persistentTaskStartTime,
3534
int totalIndices,
3635
int totalIndicesToBeUpgraded,
37-
ThreadPool threadPool,
3836
long id,
3937
String type,
4038
String action,
@@ -46,7 +44,6 @@ public ReindexDataStreamTask(
4644
this.persistentTaskStartTime = persistentTaskStartTime;
4745
this.totalIndices = totalIndices;
4846
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
49-
this.threadPool = threadPool;
5047
}
5148

5249
@Override
@@ -57,30 +54,36 @@ public ReindexDataStreamStatus getStatus() {
5754
totalIndicesToBeUpgraded,
5855
complete,
5956
exception,
60-
inProgress.size(),
61-
pending.size(),
57+
inProgress.get(),
58+
pending.get(),
6259
errors
6360
);
6461
}
6562

66-
public void reindexSucceeded() {
63+
public void allReindexesCompleted() {
6764
this.complete = true;
6865
}
6966

70-
public void reindexFailed(Exception e) {
67+
public void taskFailed(Exception e) {
7168
this.complete = true;
7269
this.exception = e;
7370
}
7471

75-
public void setInProgressIndices(List<String> inProgressIndices) {
76-
this.inProgress = inProgressIndices;
72+
public void reindexSucceeded() {
73+
inProgress.decrementAndGet();
74+
}
75+
76+
public void reindexFailed(String index, Exception error) {
77+
this.errors.add(Tuple.tuple(index, error));
78+
inProgress.decrementAndGet();
7779
}
7880

79-
public void setPendingIndices(List<String> pendingIndices) {
80-
this.pending = pendingIndices;
81+
public void incrementInProgressIndicesCount() {
82+
inProgress.incrementAndGet();
83+
pending.decrementAndGet();
8184
}
8285

83-
public void addErrorIndex(String index, Exception error) {
84-
this.errors.add(Tuple.tuple(index, error));
86+
public void setPendingIndicesCount(int size) {
87+
pending.set(size);
8588
}
8689
}

0 commit comments

Comments
 (0)