Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
package org.elasticsearch.reindex.management;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand All @@ -25,7 +33,9 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.ShutdownPrepareService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.reindex.ReindexMetrics;
Expand All @@ -34,6 +44,7 @@
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult;
Expand Down Expand Up @@ -97,7 +108,13 @@ public class ReindexRelocationIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, ReindexManagementPlugin.class, MainRestPlugin.class, TestTelemetryPlugin.class);
return Arrays.asList(
ReindexPlugin.class,
ReindexManagementPlugin.class,
MainRestPlugin.class,
TestTelemetryPlugin.class,
BlockTasksWritePlugin.class
);
}

@Override
Expand Down Expand Up @@ -230,6 +247,105 @@ private void testReindexRelocation(
assertExpectedNumberOfDocumentsInDestinationIndex();
}

/**
* Verifies that the destination node writes the source task result to {@code .tasks} during relocation, so the chain link is preserved
* even when the source node cannot write. The test uses {@link BlockTasksWritePlugin} to block all {@code .tasks} writes on the source
* node, so only the destination's write (in {@code Reindexer.storeRelocationSourceTaskResult}) succeeds.
*/
public void testDestinationWritesSourceTaskResultToTasksIndex() throws Exception {
assumeTrue("reindex resilience is enabled", ReindexPlugin.REINDEX_RESILIENCE_ENABLED);
final int shards = randomIntBetween(1, 5);

final String nodeAName = internalCluster().startNode(
NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE))
);
final String nodeAId = nodeIdByName(nodeAName);
final String nodeBName = internalCluster().startNode(
NodeRoles.onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE))
);
ensureStableCluster(2);

createIndexPinnedToNodeName(SOURCE_INDEX, nodeAName, shards);
createIndexPinnedToNodeName(DEST_INDEX, nodeAName, shards);
indexRandom(true, SOURCE_INDEX, numberOfDocumentsThatTakes60SecondsToIngest);
ensureGreen(SOURCE_INDEX, DEST_INDEX);

final TaskId originalTaskId = startAsyncThrottledLocalReindexOnNode(nodeBName, 1);
assertBusy(() -> {
final TaskResult running = getRunningReindex(originalTaskId);
assertThat(running.getTask().taskId().getNodeId(), equalTo(nodeIdByName(nodeBName)));
});

assertFalse(".tasks index should not exist before shutdown", indexExists(TaskResultsService.TASK_INDEX));

// Block .tasks writes on the source node so only the destination's write can succeed.
BlockTasksWritePlugin.blockedNodeName = nodeBName;
try {
shutdownNodeNameAndRelocate(nodeBName);

final TaskId relocatedTaskId = assertOriginalTaskEndStateInTasksIndexAndGetRelocatedTaskId(
originalTaskId,
nodeAId,
localReindexDescription(),
1,
shards
);

unthrottleReindex(relocatedTaskId);
assertRelocatedTaskExpectedEndState(relocatedTaskId, localReindexDescription(), 1, shards);
assertExpectedNumberOfDocumentsInDestinationIndex();
} finally {
BlockTasksWritePlugin.blockedNodeName = null;
}
}

/**
* Test plugin that blocks {@code .tasks} index writes on a specific node.
* Used to verify the destination writes the source task result during relocation.
*/
public static class BlockTasksWritePlugin extends Plugin implements ActionPlugin {
static volatile String blockedNodeName = null;
private volatile String myNodeName;

@Override
public Collection<Object> createComponents(PluginServices services) {
myNodeName = Node.NODE_NAME_SETTING.get(services.environment().settings());
return List.of();
}

@Override
public List<ActionFilter> getActionFilters() {
return List.of(new ActionFilter() {
@Override
public int order() {
return Integer.MIN_VALUE;
}

@Override
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
Task task,
String action,
Request request,
ActionListener<Response> listener,
ActionFilterChain<Request, Response> chain
) {
if (myNodeName != null && myNodeName.equals(blockedNodeName) && isTasksIndexWrite(action, request)) {
listener.onFailure(new ElasticsearchException("blocked .tasks write on [" + myNodeName + "] for testing"));
return;
}
chain.proceed(task, action, request, listener);
}

private boolean isTasksIndexWrite(String action, ActionRequest request) {
if (action.equals(TransportBulkAction.NAME) && request instanceof BulkRequest bulkRequest) {
return bulkRequest.requests().stream().anyMatch(r -> TaskResultsService.TASK_INDEX.equals(r.index()));
}
return false;
}
});
}
}

private void shutdownNodeNameAndRelocate(final String nodeName) throws Exception {
// testing assumption: .tasks should not exist yet — it's created when the task result is stored during relocation
assertFalse(".tasks index should not exist before shutdown", indexExists(TaskResultsService.TASK_INDEX));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,7 @@ public void testFollowsSingleRelocation() throws IOException {
final TaskId originalTaskId = taskId;
final TaskId relocatedTaskId = randomValueOtherThan(taskId, () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000)));

final TaskRelocatedException relocatedException = new TaskRelocatedException();
relocatedException.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId);
final var relocatedException = new TaskRelocatedException(originalTaskId, relocatedTaskId);

final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME);
final TaskResult originalResult = new TaskResult(originalInfo, (Exception) relocatedException);
Expand Down Expand Up @@ -335,11 +334,9 @@ public void testFollowsTwoRelocations() throws IOException {
() -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000))
);

final TaskRelocatedException firstRelocation = new TaskRelocatedException();
firstRelocation.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, firstRelocatedTaskId);
final var firstRelocation = new TaskRelocatedException(originalTaskId, firstRelocatedTaskId);

final TaskRelocatedException secondRelocation = new TaskRelocatedException();
secondRelocation.setOriginalAndRelocatedTaskIdMetadata(firstRelocatedTaskId, secondRelocatedTaskId);
final var secondRelocation = new TaskRelocatedException(firstRelocatedTaskId, secondRelocatedTaskId);

final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME);
final TaskResult originalResult = new TaskResult(originalInfo, (Exception) firstRelocation);
Expand Down Expand Up @@ -385,8 +382,7 @@ public void testWaitForCompletionHandlesRelocationWhileWaiting() throws IOExcept
final TaskId originalTaskId = taskId;
final TaskId relocatedTaskId = randomValueOtherThan(taskId, () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000)));

final TaskRelocatedException relocatedException = new TaskRelocatedException();
relocatedException.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId);
final var relocatedException = new TaskRelocatedException(originalTaskId, relocatedTaskId);

final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME);
final TaskResult originalIncomplete = new TaskResult(false, originalInfo);
Expand Down Expand Up @@ -450,8 +446,7 @@ public void testRelocatedTaskNotFound() throws IOException {
final TaskId originalTaskId = taskId;
final TaskId relocatedTaskId = randomValueOtherThan(taskId, () -> new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 1000)));

final TaskRelocatedException relocatedException = new TaskRelocatedException();
relocatedException.setOriginalAndRelocatedTaskIdMetadata(originalTaskId, relocatedTaskId);
final var relocatedException = new TaskRelocatedException(originalTaskId, relocatedTaskId);

final TaskInfo originalInfo = createTaskInfo(originalTaskId, ReindexAction.NAME);
final TaskResult originalResult = new TaskResult(originalInfo, (Exception) relocatedException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.index.reindex.ResumeBulkByScrollRequest;
import org.elasticsearch.index.reindex.ResumeBulkByScrollResponse;
import org.elasticsearch.index.reindex.ResumeInfo;
import org.elasticsearch.index.reindex.ResumeReindexAction;
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
import org.elasticsearch.reindex.remote.RemotePitPaginatedHitSource;
Expand All @@ -78,6 +79,8 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -124,6 +127,7 @@ public class Reindexer {
private final TransportService transportService;
private final ReindexRelocationNodePicker relocationNodePicker;
private final FeatureService featureService;
private final TaskResultsService taskResultsService;

Reindexer(
ClusterService clusterService,
Expand All @@ -135,7 +139,8 @@ public class Reindexer {
@Nullable ReindexMetrics reindexMetrics,
TransportService transportService,
ReindexRelocationNodePicker relocationNodePicker,
FeatureService featureService
FeatureService featureService,
TaskResultsService taskResultsService
) {
this.clusterService = clusterService;
this.projectResolver = projectResolver;
Expand All @@ -148,6 +153,7 @@ public class Reindexer {
this.transportService = transportService;
this.relocationNodePicker = Objects.requireNonNull(relocationNodePicker);
this.featureService = featureService;
this.taskResultsService = Objects.requireNonNull(taskResultsService);
}

public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
Expand All @@ -159,6 +165,44 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
}

public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener<BulkByScrollResponse> listener) {
final ResumeInfo resumeInfo = request.getResumeInfo().orElse(null);
if (resumeInfo != null && resumeInfo.sourceTaskResult() != null) {
storeRelocationSourceTaskResult(
task,
resumeInfo,
ActionListener.wrap(v -> doExecute(task, request, bulkClient, listener), listener::onFailure)
);
} else {
doExecute(task, request, bulkClient, listener);
}
}

/**
* Stores the source task's result in the {@code .tasks} index, patching the error with a {@link TaskRelocatedException} that contains
* the new task ID on the destination node. This preserves the relocation chain for the management APIs even if the source node fails
* to store its task result. For sliced reindex tasks, only the leader will store the source task result.
*/
private void storeRelocationSourceTaskResult(BulkByScrollTask task, ResumeInfo resumeInfo, ActionListener<Void> listener) {
final var relocatedException = new TaskRelocatedException(
resumeInfo.relocationOrigin().originalTaskId(),
new TaskId(clusterService.localNode().getId(), task.getId())
);
final TaskResult patched;
try {
patched = resumeInfo.sourceTaskResult().withError(relocatedException);
} catch (IOException e) {
listener.onFailure(e);
return;
}
taskResultsService.storeResult(patched, listener);
}

private void doExecute(
BulkByScrollTask task,
ReindexRequest request,
Client bulkClient,
ActionListener<BulkByScrollResponse> listener
) {
// todo: move relocations to BulkByPaginatedSearchParallelizationHelper rather than having it in Reindexer, makes it generic
// for update-by-query and delete-by-query
final ActionListener<BulkByScrollResponse> responseListener = wrapWithMetrics(
Expand Down Expand Up @@ -651,15 +695,24 @@ ActionListener<BulkByScrollResponse> listenerWithRelocations(
);
return;
}
request.setResumeInfo(response.getTaskResumeInfo().get());
final ResumeInfo resumeInfo = response.getTaskResumeInfo().get();
// Source task result is needed to preserve the relocation chain in the .tasks index on destination node.
// This is to guard against the source node failing before storing its task result containing the new relocated task ID,
// which would break the relocation chain and cause the management APIs to not be able to follow the chain to find
// the relocated task
final TaskResult sourceTaskResult;
try {
sourceTaskResult = task.result(clusterService.localNode(), new TaskRelocatedException());
} catch (IOException e) {
l.onFailure(e);
return;
}
request.setResumeInfo(
new ResumeInfo(resumeInfo.relocationOrigin(), resumeInfo.worker(), resumeInfo.slices(), sourceTaskResult)
);
final ResumeBulkByScrollRequest resumeRequest = new ResumeBulkByScrollRequest(request);
final ActionListener<ResumeBulkByScrollResponse> relocationListener = ActionListener.wrap(resp -> {
final var relocatedException = new TaskRelocatedException();
relocatedException.setOriginalAndRelocatedTaskIdMetadata(
new TaskId(clusterService.localNode().getId(), task.getId()),
resp.getTaskId()
);
l.onFailure(relocatedException);
l.onFailure(new TaskRelocatedException(resumeInfo.relocationOrigin().originalTaskId(), resp.getTaskId()));
}, l::onFailure);
transportService.sendRequest(
nodeToRelocateToNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,25 @@ public TaskRelocatedException() {
super("Task was relocated");
}

public TaskRelocatedException(TaskId originalTaskId, TaskId relocatedTaskId) {
super("Task was relocated");
assert originalTaskId.isSet() : "original task ID is not set";
assert relocatedTaskId.isSet() : "relocated task ID is not set";
this.addMetadata(ORIGINAL_TASK_ID_METADATA_KEY, originalTaskId.toString());
this.addMetadata(RELOCATED_TASK_ID_METADATA_KEY, relocatedTaskId.toString());
}

/** Returns the relocated task ID if the map is a serialized {@link TaskRelocatedException}. */
public static Optional<TaskId> relocatedTaskIdFromErrorMap(final Map<String, Object> errorMap) {
if ("task_relocated_exception".equals(errorMap.get("type"))
&& errorMap.get(RELOCATED_TASK_ID_KEY) instanceof String relocatedIdStr) {
final TaskId relocatedTaskId = new TaskId(relocatedIdStr);
assert relocatedTaskId.isSet() : "relocated task ID is not real ID";
assert relocatedTaskId.isSet() : "relocated task ID is not set";
return Optional.of(relocatedTaskId);
}
return Optional.empty();
}

public void setOriginalAndRelocatedTaskIdMetadata(final TaskId originalTaskId, final TaskId relocatedTaskId) {
assert originalTaskId.isSet() : "original task ID is not real ID";
assert relocatedTaskId.isSet() : "relocated task ID is not real ID";
this.addMetadata(ORIGINAL_TASK_ID_METADATA_KEY, originalTaskId.toString()); // implicit nullchecks
this.addMetadata(RELOCATED_TASK_ID_METADATA_KEY, relocatedTaskId.toString());
}

public Optional<String> getRelocatedTaskId() {
final List<String> relocatedTaskIds = this.getMetadata(RELOCATED_TASK_ID_METADATA_KEY);
assert relocatedTaskIds == null || relocatedTaskIds.size() == 1 : "either not present or one value";
Expand Down
Loading
Loading