1313import org .elasticsearch .action .support .ActionFilters ;
1414import org .elasticsearch .action .support .HandledTransportAction ;
1515import org .elasticsearch .action .support .master .AcknowledgedResponse ;
16+ import org .elasticsearch .client .internal .Client ;
1617import org .elasticsearch .cluster .metadata .DataStream ;
1718import org .elasticsearch .cluster .metadata .Metadata ;
1819import org .elasticsearch .cluster .service .ClusterService ;
1920import org .elasticsearch .injection .guice .Inject ;
21+ import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
2022import org .elasticsearch .persistent .PersistentTasksService ;
2123import org .elasticsearch .tasks .Task ;
2224import org .elasticsearch .threadpool .ThreadPool ;
@@ -37,13 +39,15 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
3739 private final PersistentTasksService persistentTasksService ;
3840 private final TransportService transportService ;
3941 private final ClusterService clusterService ;
42+ private final Client client ;
4043
4144 @ Inject
4245 public ReindexDataStreamTransportAction (
4346 TransportService transportService ,
4447 ActionFilters actionFilters ,
4548 PersistentTasksService persistentTasksService ,
46- ClusterService clusterService
49+ ClusterService clusterService ,
50+ Client client
4751 ) {
4852 super (
4953 ReindexDataStreamAction .NAME ,
@@ -56,6 +60,7 @@ public ReindexDataStreamTransportAction(
5660 this .transportService = transportService ;
5761 this .persistentTasksService = persistentTasksService ;
5862 this .clusterService = clusterService ;
63+ this .client = client ;
5964 }
6065
6166 @ Override
@@ -77,6 +82,40 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
7782 ClientHelper .getPersistableSafeSecurityHeaders (transportService .getThreadPool ().getThreadContext (), clusterService .state ())
7883 );
7984 String persistentTaskId = getPersistentTaskId (sourceDataStreamName );
85+
86+ PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService .state ()
87+ .getMetadata ()
88+ .custom (PersistentTasksCustomMetadata .TYPE );
89+ PersistentTasksCustomMetadata .PersistentTask <?> persistentTask = persistentTasksCustomMetadata .getTask (persistentTaskId );
90+
91+ if (persistentTask == null ) {
92+ startTask (listener , persistentTaskId , params );
93+ } else {
94+ GetMigrationReindexStatusAction .Request statusRequest = new GetMigrationReindexStatusAction .Request (sourceDataStreamName );
95+ statusRequest .setParentTask (task .getParentTaskId ());
96+ client .execute (
97+ GetMigrationReindexStatusAction .INSTANCE ,
98+ statusRequest ,
99+ listener .delegateFailureAndWrap ((getListener , getResponse ) -> {
100+ if (getResponse .getEnrichedStatus ().complete () == false ) {
101+ throw new ResourceAlreadyExistsException ("Reindex task for data stream [{}] already exists" , sourceDataStreamName );
102+ }
103+ CancelReindexDataStreamAction .Request cancelRequest = new CancelReindexDataStreamAction .Request (sourceDataStreamName );
104+ cancelRequest .setParentTask (task .getParentTaskId ());
105+ client .execute (
106+ CancelReindexDataStreamAction .INSTANCE ,
107+ cancelRequest ,
108+ getListener .delegateFailureAndWrap (
109+ (cancelListener , cancelResponse ) -> startTask (cancelListener , persistentTaskId , params )
110+ )
111+ );
112+ })
113+ );
114+ }
115+
116+ }
117+
118+ private void startTask (ActionListener <AcknowledgedResponse > listener , String persistentTaskId , ReindexDataStreamTaskParams params ) {
80119 persistentTasksService .sendStartRequest (
81120 persistentTaskId ,
82121 ReindexDataStreamTask .TASK_NAME ,
0 commit comments