1313import org .elasticsearch .action .support .ActionFilters ;
1414import org .elasticsearch .action .support .HandledTransportAction ;
1515import org .elasticsearch .action .support .master .AcknowledgedResponse ;
16+ import org .elasticsearch .client .internal .Client ;
1617import org .elasticsearch .cluster .metadata .DataStream ;
1718import org .elasticsearch .cluster .metadata .Metadata ;
1819import org .elasticsearch .cluster .service .ClusterService ;
1920import org .elasticsearch .core .TimeValue ;
2021import org .elasticsearch .injection .guice .Inject ;
22+ import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
2123import org .elasticsearch .persistent .PersistentTasksService ;
2224import org .elasticsearch .tasks .Task ;
2325import org .elasticsearch .threadpool .ThreadPool ;
@@ -38,13 +40,15 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
3840 private final PersistentTasksService persistentTasksService ;
3941 private final TransportService transportService ;
4042 private final ClusterService clusterService ;
43+ private final Client client ;
4144
4245 @ Inject
4346 public ReindexDataStreamTransportAction (
4447 TransportService transportService ,
4548 ActionFilters actionFilters ,
4649 PersistentTasksService persistentTasksService ,
47- ClusterService clusterService
50+ ClusterService clusterService ,
51+ Client client
4852 ) {
4953 super (
5054 ReindexDataStreamAction .NAME ,
@@ -57,6 +61,7 @@ public ReindexDataStreamTransportAction(
5761 this .transportService = transportService ;
5862 this .persistentTasksService = persistentTasksService ;
5963 this .clusterService = clusterService ;
64+ this .client = client ;
6065 }
6166
6267 @ Override
@@ -78,6 +83,40 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
7883 ClientHelper .getPersistableSafeSecurityHeaders (transportService .getThreadPool ().getThreadContext (), clusterService .state ())
7984 );
8085 String persistentTaskId = getPersistentTaskId (sourceDataStreamName );
86+
87+ PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService .state ()
88+ .getMetadata ()
89+ .custom (PersistentTasksCustomMetadata .TYPE );
90+ PersistentTasksCustomMetadata .PersistentTask <?> persistentTask = persistentTasksCustomMetadata .getTask (persistentTaskId );
91+
92+ if (persistentTask == null ) {
93+ startTask (listener , persistentTaskId , params );
94+ } else {
95+ GetMigrationReindexStatusAction .Request statusRequest = new GetMigrationReindexStatusAction .Request (sourceDataStreamName );
96+ statusRequest .setParentTask (task .getParentTaskId ());
97+ client .execute (
98+ GetMigrationReindexStatusAction .INSTANCE ,
99+ statusRequest ,
100+ listener .delegateFailureAndWrap ((getListener , getResponse ) -> {
101+ if (getResponse .getEnrichedStatus ().complete () == false ) {
102+ throw new ResourceAlreadyExistsException ("Reindex task for data stream [{}] already exists" , sourceDataStreamName );
103+ }
104+ CancelReindexDataStreamAction .Request cancelRequest = new CancelReindexDataStreamAction .Request (sourceDataStreamName );
105+ cancelRequest .setParentTask (task .getParentTaskId ());
106+ client .execute (
107+ CancelReindexDataStreamAction .INSTANCE ,
108+ cancelRequest ,
109+ getListener .delegateFailureAndWrap (
110+ (cancelListener , cancelResponse ) -> startTask (cancelListener , persistentTaskId , params )
111+ )
112+ );
113+ })
114+ );
115+ }
116+
117+ }
118+
119+ private void startTask (ActionListener <AcknowledgedResponse > listener , String persistentTaskId , ReindexDataStreamTaskParams params ) {
81120 persistentTasksService .sendStartRequest (
82121 persistentTaskId ,
83122 ReindexDataStreamTask .TASK_NAME ,
0 commit comments