Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/reference/migration/apis/data-stream-reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ from the original backing indices are copied to the resulting backing indices.
This api runs in the background because reindexing all indices in a large data stream
is expected to take a large amount of time and resources. The endpoint will return immediately and a persistent
task will be created to run in the background. The current status of the task can be checked with
the <<data-stream-reindex-status-api,reindex status API>>. This status will be available for 24 hours after the task completes, whether
it finished successfully or failed. If the status is still available for a task, the task must be cancelled before it can be re-run.
A running or recently completed data stream reindex task can be cancelled using the <<data-stream-reindex-cancel-api,reindex cancel API>>.
the <<data-stream-reindex-status-api,reindex status API>>. This status will be available for 24 hours after the task
completes, whether it finished successfully or failed however only the last status is retained so re-running a reindex
will overwrite the previous status for that data stream. A running or recently completed data stream reindex task can be
cancelled using the <<data-stream-reindex-cancel-api,reindex cancel API>>.

///////////////////////////////////////////////////////////
[source,console]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -38,13 +40,15 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
private final PersistentTasksService persistentTasksService;
private final TransportService transportService;
private final ClusterService clusterService;
private final Client client;

@Inject
public ReindexDataStreamTransportAction(
TransportService transportService,
ActionFilters actionFilters,
PersistentTasksService persistentTasksService,
ClusterService clusterService
ClusterService clusterService,
Client client
) {
super(
ReindexDataStreamAction.NAME,
Expand All @@ -57,6 +61,7 @@ public ReindexDataStreamTransportAction(
this.transportService = transportService;
this.persistentTasksService = persistentTasksService;
this.clusterService = clusterService;
this.client = client;
}

@Override
Expand All @@ -78,6 +83,40 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
);
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);

PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
.getMetadata()
.custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId);

if (persistentTask == null) {
startTask(listener, persistentTaskId, params);
} else {
GetMigrationReindexStatusAction.Request statusRequest = new GetMigrationReindexStatusAction.Request(sourceDataStreamName);
statusRequest.setParentTask(task.getParentTaskId());
client.execute(
GetMigrationReindexStatusAction.INSTANCE,
statusRequest,
listener.delegateFailureAndWrap((getListener, getResponse) -> {
if (getResponse.getEnrichedStatus().complete() == false) {
throw new ResourceAlreadyExistsException("Reindex task for data stream [{}] already exists", sourceDataStreamName);
}
CancelReindexDataStreamAction.Request cancelRequest = new CancelReindexDataStreamAction.Request(sourceDataStreamName);
cancelRequest.setParentTask(task.getParentTaskId());
client.execute(
CancelReindexDataStreamAction.INSTANCE,
cancelRequest,
getListener.delegateFailureAndWrap(
(cancelListener, cancelResponse) -> startTask(cancelListener, persistentTaskId, params)
)
);
})
);
}

}

private void startTask(ActionListener<AcknowledgedResponse> listener, String persistentTaskId, ReindexDataStreamTaskParams params) {
persistentTasksService.sendStartRequest(
persistentTaskId,
ReindexDataStreamTask.TASK_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,11 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
assertThat(statusResponseString, ((List<Object>) statusResponseMap.get("errors")).size(), equalTo(expectedErrorCount));
}
}, 60, TimeUnit.SECONDS);

// Verify it's possible to reindex again after a successful reindex
reindexResponse = upgradeUserClient.performRequest(reindexRequest);
assertOK(reindexResponse);

Request cancelRequest = new Request("POST", "_migration/reindex/" + dataStreamName + "/_cancel");
Response cancelResponse = upgradeUserClient.performRequest(cancelRequest);
assertOK(cancelResponse);
Expand Down
Loading