Skip to content

Commit cf73279

Browse files
committed
Allow data stream reindex tasks to be re-run after completion
1 parent 4cdb1e3 commit cf73279

File tree

1 file changed

+40
-1
lines changed

1 file changed

+40
-1
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
import org.elasticsearch.action.support.ActionFilters;
1414
import org.elasticsearch.action.support.HandledTransportAction;
1515
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.client.internal.Client;
1617
import org.elasticsearch.cluster.metadata.DataStream;
1718
import org.elasticsearch.cluster.metadata.Metadata;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.core.TimeValue;
2021
import org.elasticsearch.injection.guice.Inject;
22+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
2123
import org.elasticsearch.persistent.PersistentTasksService;
2224
import org.elasticsearch.tasks.Task;
2325
import org.elasticsearch.threadpool.ThreadPool;
@@ -38,13 +40,15 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
3840
private final PersistentTasksService persistentTasksService;
3941
private final TransportService transportService;
4042
private final ClusterService clusterService;
43+
private final Client client;
4144

4245
@Inject
4346
public ReindexDataStreamTransportAction(
4447
TransportService transportService,
4548
ActionFilters actionFilters,
4649
PersistentTasksService persistentTasksService,
47-
ClusterService clusterService
50+
ClusterService clusterService,
51+
Client client
4852
) {
4953
super(
5054
ReindexDataStreamAction.NAME,
@@ -57,6 +61,7 @@ public ReindexDataStreamTransportAction(
5761
this.transportService = transportService;
5862
this.persistentTasksService = persistentTasksService;
5963
this.clusterService = clusterService;
64+
this.client = client;
6065
}
6166

6267
@Override
@@ -78,6 +83,40 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
7883
ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
7984
);
8085
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
86+
87+
PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
88+
.getMetadata()
89+
.custom(PersistentTasksCustomMetadata.TYPE);
90+
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId);
91+
92+
if (persistentTask == null) {
93+
startTask(listener, persistentTaskId, params);
94+
} else {
95+
GetMigrationReindexStatusAction.Request statusRequest = new GetMigrationReindexStatusAction.Request(sourceDataStreamName);
96+
statusRequest.setParentTask(task.getParentTaskId());
97+
client.execute(
98+
GetMigrationReindexStatusAction.INSTANCE,
99+
statusRequest,
100+
listener.delegateFailureAndWrap((getListener, getResponse) -> {
101+
if (getResponse.getEnrichedStatus().complete() == false) {
102+
throw new ResourceAlreadyExistsException("Reindex task for data stream [{}] already exists", sourceDataStreamName);
103+
}
104+
CancelReindexDataStreamAction.Request cancelRequest = new CancelReindexDataStreamAction.Request(sourceDataStreamName);
105+
cancelRequest.setParentTask(task.getParentTaskId());
106+
client.execute(
107+
CancelReindexDataStreamAction.INSTANCE,
108+
cancelRequest,
109+
getListener.delegateFailureAndWrap(
110+
(cancelListener, cancelResponse) -> startTask(cancelListener, persistentTaskId, params)
111+
)
112+
);
113+
})
114+
);
115+
}
116+
117+
}
118+
119+
private void startTask(ActionListener<AcknowledgedResponse> listener, String persistentTaskId, ReindexDataStreamTaskParams params) {
81120
persistentTasksService.sendStartRequest(
82121
persistentTaskId,
83122
ReindexDataStreamTask.TASK_NAME,

0 commit comments

Comments
 (0)