77
88package org .elasticsearch .xpack .migrate .action ;
99
10- import org .apache .logging .log4j .LogManager ;
11- import org .apache .logging .log4j .Logger ;
1210import org .elasticsearch .action .ActionListener ;
1311import org .elasticsearch .action .support .ActionFilters ;
1412import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2220import org .elasticsearch .cluster .block .ClusterBlockLevel ;
2321import org .elasticsearch .cluster .metadata .IndexMetadata ;
2422import org .elasticsearch .cluster .metadata .LifecycleExecutionState ;
25- import org .elasticsearch .cluster .metadata .Metadata ;
23+ import org .elasticsearch .cluster .metadata .ProjectId ;
24+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
25+ import org .elasticsearch .cluster .project .ProjectResolver ;
2626import org .elasticsearch .cluster .service .ClusterService ;
2727import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
2828import org .elasticsearch .common .Priority ;
3636import org .elasticsearch .transport .TransportService ;
3737
3838import java .util .HashMap ;
39+ import java .util .Objects ;
3940
4041public class CopyLifecycleIndexMetadataTransportAction extends TransportMasterNodeAction <
4142 CopyLifecycleIndexMetadataAction .Request ,
4243 AcknowledgedResponse > {
43- private static final Logger logger = LogManager .getLogger (CopyLifecycleIndexMetadataTransportAction .class );
4444 private final ClusterStateTaskExecutor <UpdateIndexMetadataTask > executor ;
4545 private final MasterServiceTaskQueue <UpdateIndexMetadataTask > taskQueue ;
46+ private final ProjectResolver projectResolver ;
4647
4748 @ Inject
4849 public CopyLifecycleIndexMetadataTransportAction (
4950 TransportService transportService ,
5051 ClusterService clusterService ,
5152 ThreadPool threadPool ,
52- ActionFilters actionFilters
53+ ActionFilters actionFilters ,
54+ ProjectResolver projectResolver
5355 ) {
5456 super (
5557 CopyLifecycleIndexMetadataAction .NAME ,
@@ -63,11 +65,14 @@ public CopyLifecycleIndexMetadataTransportAction(
6365 );
6466 this .executor = new SimpleBatchedAckListenerTaskExecutor <>() {
6567 @ Override
66- public Tuple <ClusterState , ClusterStateAckListener > executeTask (UpdateIndexMetadataTask task , ClusterState clusterState ) {
67- return new Tuple <>(applyUpdate (clusterState , task ), task );
68+ public Tuple <ClusterState , ClusterStateAckListener > executeTask (UpdateIndexMetadataTask task , ClusterState state ) {
69+ var projectMetadata = state .metadata ().getProject (task .projectId );
70+ var updatedMetadata = applyUpdate (projectMetadata , task );
71+ return new Tuple <>(ClusterState .builder (state ).putProjectMetadata (updatedMetadata ).build (), task );
6872 }
6973 };
7074 this .taskQueue = clusterService .createTaskQueue ("migrate-copy-index-metadata" , Priority .NORMAL , this .executor );
75+ this .projectResolver = projectResolver ;
7176 }
7277
7378 @ Override
@@ -79,7 +84,13 @@ protected void masterOperation(
7984 ) {
8085 taskQueue .submitTask (
8186 "migrate-copy-index-metadata" ,
82- new UpdateIndexMetadataTask (request .sourceIndex (), request .destIndex (), request .ackTimeout (), listener ),
87+ new UpdateIndexMetadataTask (
88+ projectResolver .getProjectId (),
89+ request .sourceIndex (),
90+ request .destIndex (),
91+ request .ackTimeout (),
92+ listener
93+ ),
8394 request .masterNodeTimeout ()
8495 );
8596 }
@@ -89,13 +100,15 @@ protected ClusterBlockException checkBlock(CopyLifecycleIndexMetadataAction.Requ
89100 return state .blocks ().globalBlockedException (ClusterBlockLevel .METADATA_WRITE );
90101 }
91102
92- private static ClusterState applyUpdate (ClusterState state , UpdateIndexMetadataTask updateTask ) {
103+ private static ProjectMetadata applyUpdate (ProjectMetadata projectMetadata , UpdateIndexMetadataTask updateTask ) {
104+ assert projectMetadata != null && updateTask != null ;
105+ assert Objects .equals (updateTask .projectId , projectMetadata .id ());
93106
94- IndexMetadata sourceMetadata = state . metadata (). getProject () .index (updateTask .sourceIndex );
107+ IndexMetadata sourceMetadata = projectMetadata .index (updateTask .sourceIndex );
95108 if (sourceMetadata == null ) {
96109 throw new IndexNotFoundException (updateTask .sourceIndex );
97110 }
98- IndexMetadata destMetadata = state . metadata (). getProject () .index (updateTask .destIndex );
111+ IndexMetadata destMetadata = projectMetadata .index (updateTask .destIndex );
99112 if (destMetadata == null ) {
100113 throw new IndexNotFoundException (updateTask .destIndex );
101114 }
@@ -113,19 +126,26 @@ private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataT
113126 // creation date updates settings so must increment settings version
114127 .settingsVersion (destMetadata .getSettingsVersion () + 1 );
115128
116- var indices = new HashMap <>(state . metadata (). getProject () .indices ());
129+ var indices = new HashMap <>(projectMetadata .indices ());
117130 indices .put (updateTask .destIndex , newDestMetadata .build ());
118131
119- Metadata newMetadata = Metadata .builder (state .metadata ()).indices (indices ).build ();
120- return ClusterState .builder (state ).metadata (newMetadata ).build ();
132+ return ProjectMetadata .builder (projectMetadata ).indices (indices ).build ();
121133 }
122134
123135 static class UpdateIndexMetadataTask extends AckedBatchedClusterStateUpdateTask {
136+ private final ProjectId projectId ;
124137 private final String sourceIndex ;
125138 private final String destIndex ;
126139
127- UpdateIndexMetadataTask (String sourceIndex , String destIndex , TimeValue ackTimeout , ActionListener <AcknowledgedResponse > listener ) {
140+ UpdateIndexMetadataTask (
141+ ProjectId projectId ,
142+ String sourceIndex ,
143+ String destIndex ,
144+ TimeValue ackTimeout ,
145+ ActionListener <AcknowledgedResponse > listener
146+ ) {
128147 super (ackTimeout , listener );
148+ this .projectId = projectId ;
129149 this .sourceIndex = sourceIndex ;
130150 this .destIndex = destIndex ;
131151 }
0 commit comments