Skip to content

Commit 85f2ac1

Browse files
committed
ResumeInfo never nullable
1 parent 18b32c0 commit 85f2ac1

File tree

8 files changed

+54
-49
lines changed

8 files changed

+54
-49
lines changed

modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportListReindexAction.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -92,28 +92,26 @@ protected void taskOperation(CancellableTask actionTask, ListReindexRequest requ
9292
assert task instanceof BulkByScrollTask : "task should be a BulkByScrollTask";
9393
TaskInfo info = task.taskInfo(clusterService.localNode().getId(), request.getDetailed());
9494
if (task instanceof final BulkByScrollTask bbs) {
95-
final ResumeInfo.RelocationOrigin origin = bbs.getRelocationOrigin().orElse(null);
96-
if (origin != null) {
97-
final TaskId originalId = origin.originalTaskId();
98-
final long originalStartMillis = origin.originalStartTimeMillis();
99-
final long adjustedRunningTimeNanos = info.runningTimeNanos() + TimeUnit.MILLISECONDS.toNanos(
100-
info.startTime() - originalStartMillis
101-
);
102-
info = new TaskInfo(
103-
originalId,
104-
info.type(),
105-
originalId.getNodeId(),
106-
info.action(),
107-
info.description(),
108-
info.status(),
109-
originalStartMillis,
110-
adjustedRunningTimeNanos,
111-
info.cancellable(),
112-
info.cancelled(),
113-
info.parentTaskId(),
114-
info.headers()
115-
);
116-
}
95+
final ResumeInfo.RelocationOrigin origin = bbs.getRelocationOrigin();
96+
final TaskId originalId = origin.originalTaskId();
97+
final long originalStartMillis = origin.originalStartTimeMillis();
98+
final long adjustedRunningTimeNanos = info.runningTimeNanos() + TimeUnit.MILLISECONDS.toNanos(
99+
info.startTime() - originalStartMillis
100+
);
101+
info = new TaskInfo(
102+
originalId,
103+
info.type(),
104+
originalId.getNodeId(),
105+
info.action(),
106+
info.description(),
107+
info.status(),
108+
originalStartMillis,
109+
adjustedRunningTimeNanos,
110+
info.cancellable(),
111+
info.cancelled(),
112+
info.parentTaskId(),
113+
info.headers()
114+
);
117115
}
118116
listener.onResponse(info);
119117
}

modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncRes
570570
worker.getStatus(),
571571
remoteVersion
572572
);
573-
final ResumeInfo resumeInfo = new ResumeInfo(workerResumeInfo, null);
573+
final ResumeInfo resumeInfo = new ResumeInfo(task.getRelocationOrigin(), workerResumeInfo, null);
574574
// This response is a local carrier for resumeInfo — for higher-level code to handle relocation and then discard.
575575
// However, status must be accurate for sliced tasks only, the leader state stores this response and derives
576576
// its own combined status from it to serialize to .tasks index.

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.elasticsearch.index.reindex.RemoteInfo;
6060
import org.elasticsearch.index.reindex.ResumeBulkByScrollRequest;
6161
import org.elasticsearch.index.reindex.ResumeBulkByScrollResponse;
62-
import org.elasticsearch.index.reindex.ResumeInfo;
6362
import org.elasticsearch.index.reindex.ResumeReindexAction;
6463
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
6564
import org.elasticsearch.reindex.remote.RemoteReindexingUtils;
@@ -143,7 +142,7 @@ public class Reindexer {
143142

144143
public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
145144
final ActionListener<Void> initListener = listener.delegateFailure((l, v) -> {
146-
initTaskForRelocationIfEnabled(task, request);
145+
initTaskForRelocationIfEnabled(task);
147146
l.onResponse(v);
148147
});
149148
BulkByPaginatedSearchParallelizationHelper.initTaskState(task, request, client, initListener);
@@ -372,13 +371,7 @@ ActionListener<BulkByScrollResponse> listenerWithRelocations(
372371
);
373372
return;
374373
}
375-
final ResumeInfo currentResumeInfo = response.getTaskResumeInfo().get();
376-
final ResumeInfo.RelocationOrigin origin = request.getResumeInfo()
377-
.map(r -> Objects.requireNonNull(r.relocationOrigin(), "relocation origin should be set if resume info is present"))
378-
.orElseGet(
379-
() -> new ResumeInfo.RelocationOrigin(new TaskId(clusterService.localNode().getId(), task.getId()), task.getStartTime())
380-
);
381-
request.setResumeInfo(new ResumeInfo(origin, currentResumeInfo.worker(), currentResumeInfo.slices()));
374+
request.setResumeInfo(response.getTaskResumeInfo().get());
382375
final ResumeBulkByScrollRequest resumeRequest = new ResumeBulkByScrollRequest(request);
383376
final ActionListener<ResumeBulkByScrollResponse> relocationListener = ActionListener.wrap(resp -> {
384377
final var relocatedException = new TaskRelocatedException();
@@ -397,7 +390,7 @@ ActionListener<BulkByScrollResponse> listenerWithRelocations(
397390
});
398391
}
399392

400-
private void initTaskForRelocationIfEnabled(final BulkByScrollTask task, final ReindexRequest request) {
393+
private void initTaskForRelocationIfEnabled(final BulkByScrollTask task) {
401394
// todo: move initialization to BulkByPaginatedSearchParallelizationHelper rather than having it in Reindexer, makes it generic
402395
// for update-by-query and delete-by-query
403396
if (ReindexPlugin.REINDEX_RESILIENCE_ENABLED == false) {

modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.reindex.ResumeInfo;
3131
import org.elasticsearch.index.reindex.ResumeReindexAction;
3232
import org.elasticsearch.script.ScriptService;
33+
import org.elasticsearch.tasks.Task;
3334
import org.elasticsearch.tasks.TaskId;
3435
import org.elasticsearch.tasks.TaskManager;
3536
import org.elasticsearch.test.ESTestCase;
@@ -179,7 +180,7 @@ public void testListenerWithRelocationsPassesThroughForWorkerWithLeaderParent()
179180
assumeTrue("reindex resilience enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED);
180181
final long parentTaskId = 99;
181182
final BulkByScrollTask leaderTask = new BulkByScrollTask(
182-
parentTaskId,
183+
new TaskId(randomAlphaOfLength(5), parentTaskId),
183184
"test_type",
184185
"test_action",
185186
"test",

server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
480480
ResumeInfo resumeInfo = this.getResumeInfo().get();
481481
int sliceId = request.getSearchRequest().source().slice().getId();
482482
if (resumeInfo.isSliceCompleted(sliceId) == false) {
483-
request.setResumeInfo(new ResumeInfo(resumeInfo.getSlice(sliceId).get().resumeInfo(), null));
483+
request.setResumeInfo(new ResumeInfo(resumeInfo.relocationOrigin(), resumeInfo.getSlice(sliceId).get().resumeInfo(), null));
484484
}
485485
}
486486

@@ -496,16 +496,17 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
496496
}
497497

498498
@Override
499-
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
499+
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
500+
ResumeInfo.RelocationOrigin existingOrigin = resumeInfo != null ? resumeInfo.relocationOrigin() : null;
500501
return new BulkByScrollTask(
501-
id,
502+
taskId,
502503
type,
503504
action,
504505
getDescription(),
505506
parentTaskId,
506507
headers,
507508
eligibleForRelocationOnShutdown,
508-
resumeInfo == null ? null : resumeInfo.relocationOrigin()
509+
existingOrigin
509510
);
510511
}
511512

server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36-
import java.util.Optional;
3736
import java.util.Set;
3837
import java.util.concurrent.TimeUnit;
3938

@@ -57,25 +56,38 @@
5756
public class BulkByScrollTask extends CancellableTask {
5857

5958
private final boolean eligibleForRelocationOnShutdown;
60-
@Nullable
6159
private final ResumeInfo.RelocationOrigin relocationOrigin;
6260
private volatile LeaderBulkByScrollTaskState leaderState;
6361
private volatile WorkerBulkByScrollTaskState workerState;
6462
private volatile boolean relocationRequested = false;
6563

6664
public BulkByScrollTask(
67-
long id,
65+
long taskId,
6866
String type,
6967
String action,
7068
String description,
7169
TaskId parentTaskId,
7270
Map<String, String> headers,
7371
boolean eligibleForRelocationOnShutdown,
74-
@Nullable ResumeInfo.RelocationOrigin relocationOrigin
72+
@Nullable ResumeInfo.RelocationOrigin existingOrigin
7573
) {
76-
super(id, type, action, description, parentTaskId, headers);
74+
super(taskId, type, action, description, parentTaskId, headers);
75+
throw new IllegalStateException("test");
76+
}
77+
78+
public BulkByScrollTask(
79+
TaskId taskId,
80+
String type,
81+
String action,
82+
String description,
83+
TaskId parentTaskId,
84+
Map<String, String> headers,
85+
boolean eligibleForRelocationOnShutdown,
86+
@Nullable ResumeInfo.RelocationOrigin existingOrigin
87+
) {
88+
super(taskId.getId(), type, action, description, parentTaskId, headers);
7789
this.eligibleForRelocationOnShutdown = eligibleForRelocationOnShutdown;
78-
this.relocationOrigin = relocationOrigin;
90+
this.relocationOrigin = existingOrigin != null ? existingOrigin : new ResumeInfo.RelocationOrigin(taskId, this.startTime);
7991
}
8092

8193
@Override
@@ -227,9 +239,9 @@ public boolean isRelocationRequested() {
227239
return relocationRequested;
228240
}
229241

230-
/** Returns the relocation origin if this task is a relocated continuation. */
231-
public Optional<ResumeInfo.RelocationOrigin> getRelocationOrigin() {
232-
return Optional.ofNullable(relocationOrigin);
242+
/** Returns the relocation origin for this task — always non-null. */
243+
public ResumeInfo.RelocationOrigin getRelocationOrigin() {
244+
return relocationOrigin;
233245
}
234246

235247
/**

server/src/main/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private Optional<BulkByScrollResponse> relocationResponseIfNeeded() {
174174
if (allJobsCompletedThereforeNoNeedForRelocation) {
175175
return Optional.empty();
176176
}
177-
final var resumeInfo = new ResumeInfo(null, sliceResumeInfoMap);
177+
final var resumeInfo = new ResumeInfo(task.getRelocationOrigin(), null, sliceResumeInfoMap);
178178
// this response is a local carrier for resumeInfo only — for higher-level code to handle relocation and then discard.
179179
// the status for the task that's serialized into the .tasks index is taken from the leader state.
180180
return Optional.of(

server/src/main/java/org/elasticsearch/tasks/Task.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class Task implements Traceable {
100100
/**
101101
* The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
102102
*/
103-
private final long startTime;
103+
protected final long startTime;
104104

105105
/**
106106
* The task's start time as a relative time ({@link System#nanoTime()} style).

0 commit comments

Comments
 (0)