Skip to content

Commit 7d17f7a

Browse files
committed
Capture the projectId in UpdateIndexMetadataTask
1 parent 34e9808 commit 7d17f7a

File tree

1 file changed

+29
-8
lines changed

1 file changed

+29
-8
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportAction.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2121
import org.elasticsearch.cluster.metadata.IndexMetadata;
2222
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
23+
import org.elasticsearch.cluster.metadata.ProjectId;
2324
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2425
import org.elasticsearch.cluster.project.ProjectResolver;
2526
import org.elasticsearch.cluster.service.ClusterService;
@@ -35,12 +36,14 @@
3536
import org.elasticsearch.transport.TransportService;
3637

3738
import java.util.HashMap;
39+
import java.util.Objects;
3840

3941
public class CopyLifecycleIndexMetadataTransportAction extends TransportMasterNodeAction<
4042
CopyLifecycleIndexMetadataAction.Request,
4143
AcknowledgedResponse> {
4244
private final ClusterStateTaskExecutor<UpdateIndexMetadataTask> executor;
4345
private final MasterServiceTaskQueue<UpdateIndexMetadataTask> taskQueue;
46+
private final ProjectResolver projectResolver;
4447

4548
@Inject
4649
public CopyLifecycleIndexMetadataTransportAction(
@@ -62,12 +65,14 @@ public CopyLifecycleIndexMetadataTransportAction(
6265
);
6366
this.executor = new SimpleBatchedAckListenerTaskExecutor<>() {
6467
@Override
65-
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateIndexMetadataTask task, ClusterState clusterState) {
66-
final ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);
67-
return new Tuple<>(applyUpdate(clusterState, projectMetadata, task), task);
68+
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateIndexMetadataTask task, ClusterState state) {
69+
var projectMetadata = state.metadata().getProject(task.projectId);
70+
var updatedMetadata = applyUpdate(projectMetadata, task);
71+
return new Tuple<>(ClusterState.builder(state).putProjectMetadata(updatedMetadata).build(), task);
6872
}
6973
};
7074
this.taskQueue = clusterService.createTaskQueue("migrate-copy-index-metadata", Priority.NORMAL, this.executor);
75+
this.projectResolver = projectResolver;
7176
}
7277

7378
@Override
@@ -79,7 +84,13 @@ protected void masterOperation(
7984
) {
8085
taskQueue.submitTask(
8186
"migrate-copy-index-metadata",
82-
new UpdateIndexMetadataTask(request.sourceIndex(), request.destIndex(), request.ackTimeout(), listener),
87+
new UpdateIndexMetadataTask(
88+
projectResolver.getProjectId(),
89+
request.sourceIndex(),
90+
request.destIndex(),
91+
request.ackTimeout(),
92+
listener
93+
),
8394
request.masterNodeTimeout()
8495
);
8596
}
@@ -89,7 +100,10 @@ protected ClusterBlockException checkBlock(CopyLifecycleIndexMetadataAction.Requ
89100
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
90101
}
91102

92-
private static ClusterState applyUpdate(ClusterState state, ProjectMetadata projectMetadata, UpdateIndexMetadataTask updateTask) {
103+
private static ProjectMetadata applyUpdate(ProjectMetadata projectMetadata, UpdateIndexMetadataTask updateTask) {
104+
assert projectMetadata != null && updateTask != null;
105+
assert Objects.equals(updateTask.projectId, projectMetadata.id());
106+
93107
IndexMetadata sourceMetadata = projectMetadata.index(updateTask.sourceIndex);
94108
if (sourceMetadata == null) {
95109
throw new IndexNotFoundException(updateTask.sourceIndex);
@@ -115,16 +129,23 @@ private static ClusterState applyUpdate(ClusterState state, ProjectMetadata proj
115129
var indices = new HashMap<>(projectMetadata.indices());
116130
indices.put(updateTask.destIndex, newDestMetadata.build());
117131

118-
ProjectMetadata newMetadata = ProjectMetadata.builder(projectMetadata).indices(indices).build();
119-
return ClusterState.builder(state).putProjectMetadata(newMetadata).build();
132+
return ProjectMetadata.builder(projectMetadata).indices(indices).build();
120133
}
121134

122135
static class UpdateIndexMetadataTask extends AckedBatchedClusterStateUpdateTask {
136+
private final ProjectId projectId;
123137
private final String sourceIndex;
124138
private final String destIndex;
125139

126-
UpdateIndexMetadataTask(String sourceIndex, String destIndex, TimeValue ackTimeout, ActionListener<AcknowledgedResponse> listener) {
140+
UpdateIndexMetadataTask(
141+
ProjectId projectId,
142+
String sourceIndex,
143+
String destIndex,
144+
TimeValue ackTimeout,
145+
ActionListener<AcknowledgedResponse> listener
146+
) {
127147
super(ackTimeout, listener);
148+
this.projectId = projectId;
128149
this.sourceIndex = sourceIndex;
129150
this.destIndex = destIndex;
130151
}

0 commit comments

Comments
 (0)