1111import org .elasticsearch .ResourceNotFoundException ;
1212import org .elasticsearch .action .ActionListener ;
1313import org .elasticsearch .action .ActionListenerResponseHandler ;
14+ import org .elasticsearch .action .admin .indices .stats .IndexStats ;
15+ import org .elasticsearch .action .admin .indices .stats .IndicesStatsAction ;
16+ import org .elasticsearch .action .admin .indices .stats .IndicesStatsRequest ;
17+ import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
1418import org .elasticsearch .action .support .ActionFilters ;
1519import org .elasticsearch .action .support .HandledTransportAction ;
20+ import org .elasticsearch .action .support .IndicesOptions ;
21+ import org .elasticsearch .client .internal .Client ;
1622import org .elasticsearch .cluster .node .DiscoveryNode ;
1723import org .elasticsearch .cluster .service .ClusterService ;
1824import org .elasticsearch .common .util .concurrent .EsExecutors ;
1925import org .elasticsearch .core .Strings ;
26+ import org .elasticsearch .core .Tuple ;
27+ import org .elasticsearch .index .shard .DocsStats ;
2028import org .elasticsearch .injection .guice .Inject ;
2129import org .elasticsearch .persistent .AllocatedPersistentTask ;
2230import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
2331import org .elasticsearch .tasks .CancellableTask ;
2432import org .elasticsearch .tasks .Task ;
2533import org .elasticsearch .tasks .TaskInfo ;
26- import org .elasticsearch .tasks .TaskResult ;
2734import org .elasticsearch .transport .TransportRequestOptions ;
2835import org .elasticsearch .transport .TransportService ;
2936import org .elasticsearch .xpack .migrate .action .GetMigrationReindexStatusAction .Request ;
3037import org .elasticsearch .xpack .migrate .action .GetMigrationReindexStatusAction .Response ;
38+ import org .elasticsearch .xpack .migrate .task .ReindexDataStreamEnrichedStatus ;
39+ import org .elasticsearch .xpack .migrate .task .ReindexDataStreamStatus ;
3140
41+ import java .util .HashMap ;
3242import java .util .Map ;
3343import java .util .Optional ;
44+ import java .util .Set ;
45+ import java .util .stream .Stream ;
3446
3547public class GetMigrationReindexStatusTransportAction extends HandledTransportAction <Request , Response > {
3648 private final ClusterService clusterService ;
3749 private final TransportService transportService ;
50+ private final Client client ;
3851
3952 @ Inject
4053 public GetMigrationReindexStatusTransportAction (
4154 ClusterService clusterService ,
4255 TransportService transportService ,
43- ActionFilters actionFilters
56+ ActionFilters actionFilters ,
57+ Client client
4458 ) {
4559 super (GetMigrationReindexStatusAction .NAME , transportService , actionFilters , Request ::new , EsExecutors .DIRECT_EXECUTOR_SERVICE );
4660 this .clusterService = clusterService ;
4761 this .transportService = transportService ;
62+ this .client = client ;
4863 }
4964
5065 @ Override
@@ -60,9 +75,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
6075 } else if (persistentTask .isAssigned ()) {
6176 String nodeId = persistentTask .getExecutorNode ();
6277 if (clusterService .localNode ().getId ().equals (nodeId )) {
63- getRunningTaskFromNode (persistentTaskId , listener );
78+ fetchAndReportStatusForTaskOnThisNode (persistentTaskId , listener );
6479 } else {
65- runOnNodeWithTaskIfPossible (task , request , nodeId , listener );
80+ fetchAndReportStatusForTaskOnRemoteNode (task , request , nodeId , listener );
6681 }
6782 } else {
6883 listener .onFailure (new ElasticsearchException ("Persistent task with id [{}] is not assigned to a node" , persistentTaskId ));
@@ -82,7 +97,7 @@ private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) {
8297 return optionalTask .<Task >map (Map .Entry ::getValue ).orElse (null );
8398 }
8499
85- void getRunningTaskFromNode (String persistentTaskId , ActionListener <Response > listener ) {
100+ void fetchAndReportStatusForTaskOnThisNode (String persistentTaskId , ActionListener <Response > listener ) {
86101 Task runningTask = getRunningPersistentTaskFromTaskManager (persistentTaskId );
87102 if (runningTask == null ) {
88103 listener .onFailure (
@@ -96,11 +111,97 @@ void getRunningTaskFromNode(String persistentTaskId, ActionListener<Response> li
96111 );
97112 } else {
98113 TaskInfo info = runningTask .taskInfo (clusterService .localNode ().getId (), true );
99- listener .onResponse (new Response (new TaskResult (false , info )));
114+ ReindexDataStreamStatus status = (ReindexDataStreamStatus ) info .status ();
115+ Set <String > inProgressIndices = status .inProgress ();
116+ if (inProgressIndices .isEmpty ()) {
117+ // We have no reason to fetch index stats since there are no in progress indices
118+ reportStatus (Map .of (), status , listener );
119+ } else {
120+ fetchInProgressStatsAndReportStatus (inProgressIndices , status , listener );
121+ }
100122 }
101123 }
102124
103- private void runOnNodeWithTaskIfPossible (Task thisTask , Request request , String nodeId , ActionListener <Response > listener ) {
125+ /*
126+ * The status is enriched with the information from inProgressMap to create a new ReindexDataStreamEnrichedStatus, which is used in the
127+ * response sent to the listener.
128+ */
129+ private void reportStatus (
130+ Map <String , Tuple <Long , Long >> inProgressMap ,
131+ ReindexDataStreamStatus status ,
132+ ActionListener <Response > listener
133+ ) {
134+ ReindexDataStreamEnrichedStatus enrichedStatus = new ReindexDataStreamEnrichedStatus (
135+ status .persistentTaskStartTime (),
136+ status .totalIndices (),
137+ status .totalIndicesToBeUpgraded (),
138+ status .complete (),
139+ status .exception (),
140+ inProgressMap ,
141+ status .pending (),
142+ status .errors ()
143+ );
144+ listener .onResponse (new Response (enrichedStatus ));
145+ }
146+
147+ /*
148+ * This method feches doc counts for all indices in inProgressIndices (and the indices they are being reindexed into). After
149+ * successfully fetching those, reportStatus is called.
150+ */
151+ private void fetchInProgressStatsAndReportStatus (
152+ Set <String > inProgressIndices ,
153+ ReindexDataStreamStatus status ,
154+ ActionListener <Response > listener
155+ ) {
156+ IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest ();
157+ String [] indices = inProgressIndices .stream ()
158+ .flatMap (index -> Stream .of (index , ReindexDataStreamIndexTransportAction .generateDestIndexName (index )))
159+ .toList ()
160+ .toArray (new String [0 ]);
161+ indicesStatsRequest .indices (indices );
162+ /*
163+ * It is possible that the destination index will not exist yet, so we want to ignore the fact that it is missing
164+ */
165+ indicesStatsRequest .indicesOptions (IndicesOptions .fromOptions (true , true , true , true ));
166+ client .execute (IndicesStatsAction .INSTANCE , indicesStatsRequest , new ActionListener <IndicesStatsResponse >() {
167+ @ Override
168+ public void onResponse (IndicesStatsResponse indicesStatsResponse ) {
169+ Map <String , Tuple <Long , Long >> inProgressMap = new HashMap <>();
170+ for (String index : inProgressIndices ) {
171+ IndexStats sourceIndexStats = indicesStatsResponse .getIndex (index );
172+ final long totalDocsInIndex ;
173+ if (sourceIndexStats == null ) {
174+ totalDocsInIndex = 0 ;
175+ } else {
176+ DocsStats totalDocsStats = sourceIndexStats .getTotal ().getDocs ();
177+ totalDocsInIndex = totalDocsStats == null ? 0 : totalDocsStats .getCount ();
178+ }
179+ IndexStats migratedIndexStats = indicesStatsResponse .getIndex (
180+ ReindexDataStreamIndexTransportAction .generateDestIndexName (index )
181+ );
182+ final long reindexedDocsInIndex ;
183+ if (migratedIndexStats == null ) {
184+ reindexedDocsInIndex = 0 ;
185+ } else {
186+ DocsStats reindexedDocsStats = migratedIndexStats .getTotal ().getDocs ();
187+ reindexedDocsInIndex = reindexedDocsStats == null ? 0 : reindexedDocsStats .getCount ();
188+ }
189+ inProgressMap .put (index , Tuple .tuple (totalDocsInIndex , reindexedDocsInIndex ));
190+ }
191+ reportStatus (inProgressMap , status , listener );
192+ }
193+
194+ @ Override
195+ public void onFailure (Exception e ) {
196+ listener .onFailure (e );
197+ }
198+ });
199+ }
200+
201+ /*
202+ * The task and its status exist on some other node, so this method forwards the request to that node.
203+ */
204+ private void fetchAndReportStatusForTaskOnRemoteNode (Task thisTask , Request request , String nodeId , ActionListener <Response > listener ) {
104205 DiscoveryNode node = clusterService .state ().nodes ().get (nodeId );
105206 if (node == null ) {
106207 listener .onFailure (
0 commit comments