Skip to content

Commit e05c39c

Browse files
committed
ResumeInfo never nullable
1 parent 18b32c0 commit e05c39c

File tree

6 files changed

+81
-47
lines changed

6 files changed

+81
-47
lines changed

modules/reindex-management/src/main/java/org/elasticsearch/reindex/management/TransportListReindexAction.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -92,28 +92,26 @@ protected void taskOperation(CancellableTask actionTask, ListReindexRequest requ
9292
assert task instanceof BulkByScrollTask : "task should be a BulkByScrollTask";
9393
TaskInfo info = task.taskInfo(clusterService.localNode().getId(), request.getDetailed());
9494
if (task instanceof final BulkByScrollTask bbs) {
95-
final ResumeInfo.RelocationOrigin origin = bbs.getRelocationOrigin().orElse(null);
96-
if (origin != null) {
97-
final TaskId originalId = origin.originalTaskId();
98-
final long originalStartMillis = origin.originalStartTimeMillis();
99-
final long adjustedRunningTimeNanos = info.runningTimeNanos() + TimeUnit.MILLISECONDS.toNanos(
100-
info.startTime() - originalStartMillis
101-
);
102-
info = new TaskInfo(
103-
originalId,
104-
info.type(),
105-
originalId.getNodeId(),
106-
info.action(),
107-
info.description(),
108-
info.status(),
109-
originalStartMillis,
110-
adjustedRunningTimeNanos,
111-
info.cancellable(),
112-
info.cancelled(),
113-
info.parentTaskId(),
114-
info.headers()
115-
);
116-
}
95+
final ResumeInfo.RelocationOrigin origin = bbs.getRelocationOrigin();
96+
final TaskId originalId = origin.originalTaskId();
97+
final long originalStartMillis = origin.originalStartTimeMillis();
98+
final long adjustedRunningTimeNanos = info.runningTimeNanos() + TimeUnit.MILLISECONDS.toNanos(
99+
info.startTime() - originalStartMillis
100+
);
101+
info = new TaskInfo(
102+
originalId,
103+
info.type(),
104+
originalId.getNodeId(),
105+
info.action(),
106+
info.description(),
107+
info.status(),
108+
originalStartMillis,
109+
adjustedRunningTimeNanos,
110+
info.cancellable(),
111+
info.cancelled(),
112+
info.parentTaskId(),
113+
info.headers()
114+
);
117115
}
118116
listener.onResponse(info);
119117
}

modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncRes
570570
worker.getStatus(),
571571
remoteVersion
572572
);
573-
final ResumeInfo resumeInfo = new ResumeInfo(workerResumeInfo, null);
573+
final ResumeInfo resumeInfo = new ResumeInfo(task.getRelocationOrigin(), workerResumeInfo, null);
574574
// This response is a local carrier for resumeInfo — for higher-level code to handle relocation and then discard.
575575
// However, status must be accurate for sliced tasks only, the leader state stores this response and derives
576576
// its own combined status from it to serialize to .tasks index.

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,22 @@ public class Reindexer {
143143

144144
public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
145145
final ActionListener<Void> initListener = listener.delegateFailure((l, v) -> {
146+
initRelocationOrigin(task, request);
146147
initTaskForRelocationIfEnabled(task, request);
147148
l.onResponse(v);
148149
});
149150
BulkByPaginatedSearchParallelizationHelper.initTaskState(task, request, client, initListener);
150151
}
151152

153+
private void initRelocationOrigin(BulkByScrollTask task, ReindexRequest request) {
154+
if (request.getRelocationOrigin() != null) {
155+
return;
156+
}
157+
var selfOrigin = new ResumeInfo.RelocationOrigin(new TaskId(clusterService.localNode().getId(), task.getId()), task.getStartTime());
158+
task.initSelfOrigin(selfOrigin);
159+
request.setRelocationOrigin(selfOrigin);
160+
}
161+
152162
public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener<BulkByScrollResponse> listener) {
153163
// todo(szy/sam): handle sliced and non-sliced
154164
// todo(szy/sam): bug, we send System::nanoTime across JVMs
@@ -373,12 +383,7 @@ ActionListener<BulkByScrollResponse> listenerWithRelocations(
373383
return;
374384
}
375385
final ResumeInfo currentResumeInfo = response.getTaskResumeInfo().get();
376-
final ResumeInfo.RelocationOrigin origin = request.getResumeInfo()
377-
.map(r -> Objects.requireNonNull(r.relocationOrigin(), "relocation origin should be set if resume info is present"))
378-
.orElseGet(
379-
() -> new ResumeInfo.RelocationOrigin(new TaskId(clusterService.localNode().getId(), task.getId()), task.getStartTime())
380-
);
381-
request.setResumeInfo(new ResumeInfo(origin, currentResumeInfo.worker(), currentResumeInfo.slices()));
386+
request.setResumeInfo(new ResumeInfo(task.getRelocationOrigin(), currentResumeInfo.worker(), currentResumeInfo.slices()));
382387
final ResumeBulkByScrollRequest resumeRequest = new ResumeBulkByScrollRequest(request);
383388
final ActionListener<ResumeBulkByScrollResponse> relocationListener = ActionListener.wrap(resp -> {
384389
final var relocatedException = new TaskRelocatedException();

server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,14 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
116116
@Nullable
117117
private ResumeInfo resumeInfo;
118118

119+
/**
120+
* Identity of the original task, propagated across relocations and slices so the user-facing task ID and start time are preserved.
121+
* Transient — not serialized; flows through {@link ResumeInfo} on the wire, then synced here via {@link #setResumeInfo} or set
122+
* directly by the init path for first-run tasks.
123+
*/
124+
@Nullable
125+
private ResumeInfo.RelocationOrigin relocationOrigin;
126+
119127
public AbstractBulkByScrollRequest(StreamInput in) throws IOException {
120128
super(in);
121129
searchRequest = new SearchRequest(in);
@@ -134,7 +142,13 @@ public AbstractBulkByScrollRequest(StreamInput in) throws IOException {
134142
eligibleForRelocationOnShutdown = in.readBoolean();
135143
}
136144
if (in.getTransportVersion().supports(REINDEX_RELOCATION_RESUME)) {
137-
resumeInfo = in.readOptionalWriteable(ResumeInfo::new);
145+
var ri = in.readOptionalWriteable(ResumeInfo::new);
146+
this.resumeInfo = ri;
147+
if (ri != null) {
148+
if (resumeInfo.relocationOrigin() != null) {
149+
this.relocationOrigin = resumeInfo.relocationOrigin();
150+
}
151+
}
138152
}
139153
}
140154

@@ -437,9 +451,13 @@ public int getSlices() {
437451

438452
/**
439453
* Sets resumption data to continue from a previously-acquired scroll ID.
454+
* Also syncs {@link #relocationOrigin} from the resume info when present.
440455
*/
441456
public Self setResumeInfo(ResumeInfo resumeInfo) {
442457
this.resumeInfo = Objects.requireNonNull(resumeInfo);
458+
if (resumeInfo.relocationOrigin() != null) {
459+
this.relocationOrigin = resumeInfo.relocationOrigin();
460+
}
443461
return self();
444462
}
445463

@@ -450,6 +468,16 @@ public Optional<ResumeInfo> getResumeInfo() {
450468
return Optional.ofNullable(resumeInfo);
451469
}
452470

471+
public Self setRelocationOrigin(ResumeInfo.RelocationOrigin relocationOrigin) {
472+
this.relocationOrigin = Objects.requireNonNull(relocationOrigin);
473+
return self();
474+
}
475+
476+
@Nullable
477+
public ResumeInfo.RelocationOrigin getRelocationOrigin() {
478+
return relocationOrigin;
479+
}
480+
453481
/**
454482
* Build a new request for a slice of the parent request.
455483
*/
@@ -475,12 +503,15 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
475503
.setRequestsPerSecond(requestsPerSecond / totalSlices)
476504
// Sub requests don't have workers
477505
.setSlices(1);
506+
if (this.relocationOrigin != null) {
507+
request.setRelocationOrigin(this.relocationOrigin);
508+
}
478509
// Copy resume info for the slice from leader to the slice request
479510
if (this.getResumeInfo().isPresent()) {
480511
ResumeInfo resumeInfo = this.getResumeInfo().get();
481512
int sliceId = request.getSearchRequest().source().slice().getId();
482513
if (resumeInfo.isSliceCompleted(sliceId) == false) {
483-
request.setResumeInfo(new ResumeInfo(resumeInfo.getSlice(sliceId).get().resumeInfo(), null));
514+
request.setResumeInfo(new ResumeInfo(resumeInfo.relocationOrigin(), resumeInfo.getSlice(sliceId).get().resumeInfo(), null));
484515
}
485516
}
486517

@@ -497,16 +528,7 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
497528

498529
@Override
499530
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
500-
return new BulkByScrollTask(
501-
id,
502-
type,
503-
action,
504-
getDescription(),
505-
parentTaskId,
506-
headers,
507-
eligibleForRelocationOnShutdown,
508-
resumeInfo == null ? null : resumeInfo.relocationOrigin()
509-
);
531+
return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId, headers, eligibleForRelocationOnShutdown, relocationOrigin);
510532
}
511533

512534
@Override

server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36-
import java.util.Optional;
3736
import java.util.Set;
3837
import java.util.concurrent.TimeUnit;
3938

@@ -58,7 +57,7 @@ public class BulkByScrollTask extends CancellableTask {
5857

5958
private final boolean eligibleForRelocationOnShutdown;
6059
@Nullable
61-
private final ResumeInfo.RelocationOrigin relocationOrigin;
60+
private volatile ResumeInfo.RelocationOrigin relocationOrigin;
6261
private volatile LeaderBulkByScrollTaskState leaderState;
6362
private volatile WorkerBulkByScrollTaskState workerState;
6463
private volatile boolean relocationRequested = false;
@@ -227,9 +226,19 @@ public boolean isRelocationRequested() {
227226
return relocationRequested;
228227
}
229228

230-
/** Returns the relocation origin if this task is a relocated continuation. */
231-
public Optional<ResumeInfo.RelocationOrigin> getRelocationOrigin() {
232-
return Optional.ofNullable(relocationOrigin);
229+
/**
230+
* Sets the self-origin for a first-run (non-relocated) task. Must only be called once, and only when no origin was provided at
231+
* construction time. After this call, {@link #getRelocationOrigin()} is guaranteed non-null.
232+
*/
233+
public void initSelfOrigin(ResumeInfo.RelocationOrigin origin) {
234+
assert this.relocationOrigin == null : "initSelfOrigin called but relocationOrigin is already set";
235+
this.relocationOrigin = Objects.requireNonNull(origin);
236+
}
237+
238+
/** Returns the relocation origin for this task. Always non-null after task initialisation. */
239+
public ResumeInfo.RelocationOrigin getRelocationOrigin() {
240+
assert relocationOrigin != null : "relocationOrigin has not been initialised";
241+
return relocationOrigin;
233242
}
234243

235244
/**

server/src/main/java/org/elasticsearch/index/reindex/LeaderBulkByScrollTaskState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private Optional<BulkByScrollResponse> relocationResponseIfNeeded() {
174174
if (allJobsCompletedThereforeNoNeedForRelocation) {
175175
return Optional.empty();
176176
}
177-
final var resumeInfo = new ResumeInfo(null, sliceResumeInfoMap);
177+
final var resumeInfo = new ResumeInfo(task.getRelocationOrigin(), null, sliceResumeInfoMap);
178178
// this response is a local carrier for resumeInfo only — for higher-level code to handle relocation and then discard.
179179
// the status for the task that's serialized into the .tasks index is taken from the leader state.
180180
return Optional.of(

0 commit comments

Comments
 (0)