Skip to content

Commit d69a282

Browse files
authored
Make the datastream reindexing APIs multi-project aware (#130035)
1 parent 26c4354 commit d69a282

File tree

9 files changed

+106
-52
lines changed

9 files changed

+106
-52
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CancelReindexDataStreamTransportAction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.action.support.ActionFilters;
1212
import org.elasticsearch.action.support.HandledTransportAction;
1313
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
15+
import org.elasticsearch.cluster.project.ProjectResolver;
1416
import org.elasticsearch.common.util.concurrent.EsExecutors;
1517
import org.elasticsearch.core.TimeValue;
1618
import org.elasticsearch.injection.guice.Inject;
@@ -22,27 +24,31 @@
2224

2325
public class CancelReindexDataStreamTransportAction extends HandledTransportAction<Request, AcknowledgedResponse> {
2426
private final PersistentTasksService persistentTasksService;
27+
private final ProjectResolver projectResolver;
2528

2629
@Inject
2730
public CancelReindexDataStreamTransportAction(
2831
TransportService transportService,
2932
ActionFilters actionFilters,
30-
PersistentTasksService persistentTasksService
33+
PersistentTasksService persistentTasksService,
34+
ProjectResolver projectResolver
3135
) {
3236
super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
3337
this.persistentTasksService = persistentTasksService;
38+
this.projectResolver = projectResolver;
3439
}
3540

3641
@Override
3742
protected void doExecute(Task task, Request request, ActionListener<AcknowledgedResponse> listener) {
3843
String index = request.getIndex();
44+
ProjectId projectId = projectResolver.getProjectId();
3945
String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index;
4046
/*
4147
* This removes the persistent task from the cluster state and results in the running task being cancelled (but not removed from
4248
* the task manager). The running task is removed from the task manager in ReindexDataStreamTask::onCancelled, which is called as
4349
* as result of this.
4450
*/
45-
persistentTasksService.sendRemoveRequest(persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() {
51+
persistentTasksService.sendProjectRemoveRequest(projectId, persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() {
4652
@Override
4753
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
4854
listener.onResponse(AcknowledgedResponse.TRUE);

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CopyLifecycleIndexMetadataTransportAction.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
package org.elasticsearch.xpack.migrate.action;
99

10-
import org.apache.logging.log4j.LogManager;
11-
import org.apache.logging.log4j.Logger;
1210
import org.elasticsearch.action.ActionListener;
1311
import org.elasticsearch.action.support.ActionFilters;
1412
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -22,7 +20,9 @@
2220
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2321
import org.elasticsearch.cluster.metadata.IndexMetadata;
2422
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
25-
import org.elasticsearch.cluster.metadata.Metadata;
23+
import org.elasticsearch.cluster.metadata.ProjectId;
24+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
25+
import org.elasticsearch.cluster.project.ProjectResolver;
2626
import org.elasticsearch.cluster.service.ClusterService;
2727
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2828
import org.elasticsearch.common.Priority;
@@ -36,20 +36,22 @@
3636
import org.elasticsearch.transport.TransportService;
3737

3838
import java.util.HashMap;
39+
import java.util.Objects;
3940

4041
public class CopyLifecycleIndexMetadataTransportAction extends TransportMasterNodeAction<
4142
CopyLifecycleIndexMetadataAction.Request,
4243
AcknowledgedResponse> {
43-
private static final Logger logger = LogManager.getLogger(CopyLifecycleIndexMetadataTransportAction.class);
4444
private final ClusterStateTaskExecutor<UpdateIndexMetadataTask> executor;
4545
private final MasterServiceTaskQueue<UpdateIndexMetadataTask> taskQueue;
46+
private final ProjectResolver projectResolver;
4647

4748
@Inject
4849
public CopyLifecycleIndexMetadataTransportAction(
4950
TransportService transportService,
5051
ClusterService clusterService,
5152
ThreadPool threadPool,
52-
ActionFilters actionFilters
53+
ActionFilters actionFilters,
54+
ProjectResolver projectResolver
5355
) {
5456
super(
5557
CopyLifecycleIndexMetadataAction.NAME,
@@ -63,11 +65,14 @@ public CopyLifecycleIndexMetadataTransportAction(
6365
);
6466
this.executor = new SimpleBatchedAckListenerTaskExecutor<>() {
6567
@Override
66-
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateIndexMetadataTask task, ClusterState clusterState) {
67-
return new Tuple<>(applyUpdate(clusterState, task), task);
68+
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateIndexMetadataTask task, ClusterState state) {
69+
var projectMetadata = state.metadata().getProject(task.projectId);
70+
var updatedMetadata = applyUpdate(projectMetadata, task);
71+
return new Tuple<>(ClusterState.builder(state).putProjectMetadata(updatedMetadata).build(), task);
6872
}
6973
};
7074
this.taskQueue = clusterService.createTaskQueue("migrate-copy-index-metadata", Priority.NORMAL, this.executor);
75+
this.projectResolver = projectResolver;
7176
}
7277

7378
@Override
@@ -79,7 +84,13 @@ protected void masterOperation(
7984
) {
8085
taskQueue.submitTask(
8186
"migrate-copy-index-metadata",
82-
new UpdateIndexMetadataTask(request.sourceIndex(), request.destIndex(), request.ackTimeout(), listener),
87+
new UpdateIndexMetadataTask(
88+
projectResolver.getProjectId(),
89+
request.sourceIndex(),
90+
request.destIndex(),
91+
request.ackTimeout(),
92+
listener
93+
),
8394
request.masterNodeTimeout()
8495
);
8596
}
@@ -89,13 +100,15 @@ protected ClusterBlockException checkBlock(CopyLifecycleIndexMetadataAction.Requ
89100
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
90101
}
91102

92-
private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataTask updateTask) {
103+
private static ProjectMetadata applyUpdate(ProjectMetadata projectMetadata, UpdateIndexMetadataTask updateTask) {
104+
assert projectMetadata != null && updateTask != null;
105+
assert Objects.equals(updateTask.projectId, projectMetadata.id());
93106

94-
IndexMetadata sourceMetadata = state.metadata().getProject().index(updateTask.sourceIndex);
107+
IndexMetadata sourceMetadata = projectMetadata.index(updateTask.sourceIndex);
95108
if (sourceMetadata == null) {
96109
throw new IndexNotFoundException(updateTask.sourceIndex);
97110
}
98-
IndexMetadata destMetadata = state.metadata().getProject().index(updateTask.destIndex);
111+
IndexMetadata destMetadata = projectMetadata.index(updateTask.destIndex);
99112
if (destMetadata == null) {
100113
throw new IndexNotFoundException(updateTask.destIndex);
101114
}
@@ -113,19 +126,26 @@ private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataT
113126
// creation date updates settings so must increment settings version
114127
.settingsVersion(destMetadata.getSettingsVersion() + 1);
115128

116-
var indices = new HashMap<>(state.metadata().getProject().indices());
129+
var indices = new HashMap<>(projectMetadata.indices());
117130
indices.put(updateTask.destIndex, newDestMetadata.build());
118131

119-
Metadata newMetadata = Metadata.builder(state.metadata()).indices(indices).build();
120-
return ClusterState.builder(state).metadata(newMetadata).build();
132+
return ProjectMetadata.builder(projectMetadata).indices(indices).build();
121133
}
122134

123135
static class UpdateIndexMetadataTask extends AckedBatchedClusterStateUpdateTask {
136+
private final ProjectId projectId;
124137
private final String sourceIndex;
125138
private final String destIndex;
126139

127-
UpdateIndexMetadataTask(String sourceIndex, String destIndex, TimeValue ackTimeout, ActionListener<AcknowledgedResponse> listener) {
140+
UpdateIndexMetadataTask(
141+
ProjectId projectId,
142+
String sourceIndex,
143+
String destIndex,
144+
TimeValue ackTimeout,
145+
ActionListener<AcknowledgedResponse> listener
146+
) {
128147
super(ackTimeout, listener);
148+
this.projectId = projectId;
129149
this.sourceIndex = sourceIndex;
130150
this.destIndex = destIndex;
131151
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.cluster.metadata.MappingMetadata;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
1920
import org.elasticsearch.cluster.service.ClusterService;
2021
import org.elasticsearch.common.compress.CompressedXContent;
2122
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -46,6 +47,7 @@ public class CreateIndexFromSourceTransportAction extends HandledTransportAction
4647
private final ClusterService clusterService;
4748
private final Client client;
4849
private final IndexScopedSettings indexScopedSettings;
50+
private final ProjectResolver projectResolver;
4951
private static final Set<String> INDEX_BLOCK_SETTINGS = Set.of(
5052
IndexMetadata.SETTING_READ_ONLY,
5153
IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE,
@@ -60,7 +62,8 @@ public CreateIndexFromSourceTransportAction(
6062
ClusterService clusterService,
6163
ActionFilters actionFilters,
6264
Client client,
63-
IndexScopedSettings indexScopedSettings
65+
IndexScopedSettings indexScopedSettings,
66+
ProjectResolver projectResolver
6467
) {
6568
super(
6669
CreateIndexFromSourceAction.NAME,
@@ -73,12 +76,13 @@ public CreateIndexFromSourceTransportAction(
7376
this.clusterService = clusterService;
7477
this.client = client;
7578
this.indexScopedSettings = indexScopedSettings;
79+
this.projectResolver = projectResolver;
7680
}
7781

7882
@Override
7983
protected void doExecute(Task task, CreateIndexFromSourceAction.Request request, ActionListener<AcknowledgedResponse> listener) {
8084

81-
IndexMetadata sourceIndex = clusterService.state().getMetadata().getProject().index(request.sourceIndex());
85+
IndexMetadata sourceIndex = projectResolver.getProjectMetadata(clusterService.state()).index(request.sourceIndex());
8286

8387
if (sourceIndex == null) {
8488
listener.onFailure(new IndexNotFoundException(request.sourceIndex()));

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.support.IndicesOptions;
2121
import org.elasticsearch.client.internal.Client;
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.cluster.project.ProjectResolver;
2324
import org.elasticsearch.cluster.service.ClusterService;
2425
import org.elasticsearch.common.util.concurrent.EsExecutors;
2526
import org.elasticsearch.core.Strings;
@@ -48,26 +49,29 @@ public class GetMigrationReindexStatusTransportAction extends HandledTransportAc
4849
private final ClusterService clusterService;
4950
private final TransportService transportService;
5051
private final Client client;
52+
private final ProjectResolver projectResolver;
5153

5254
@Inject
5355
public GetMigrationReindexStatusTransportAction(
5456
ClusterService clusterService,
5557
TransportService transportService,
5658
ActionFilters actionFilters,
57-
Client client
59+
Client client,
60+
ProjectResolver projectResolver
5861
) {
5962
super(GetMigrationReindexStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
6063
this.clusterService = clusterService;
6164
this.transportService = transportService;
6265
this.client = client;
66+
this.projectResolver = projectResolver;
6367
}
6468

6569
@Override
6670
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
6771
String index = request.getIndex();
6872
String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index;
6973
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = PersistentTasksCustomMetadata.getTaskWithId(
70-
clusterService.state(),
74+
projectResolver.getProjectMetadata(clusterService.state()),
7175
persistentTaskId
7276
);
7377
if (persistentTask == null) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.cluster.block.ClusterBlockException;
4141
import org.elasticsearch.cluster.metadata.IndexMetadata;
4242
import org.elasticsearch.cluster.node.DiscoveryNode;
43+
import org.elasticsearch.cluster.project.ProjectResolver;
4344
import org.elasticsearch.cluster.service.ClusterService;
4445
import org.elasticsearch.common.Randomness;
4546
import org.elasticsearch.common.settings.Setting;
@@ -108,6 +109,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
108109
private final ClusterService clusterService;
109110
private final Client client;
110111
private final TransportService transportService;
112+
private final ProjectResolver projectResolver;
111113
/*
112114
* The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
113115
* to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that
@@ -121,7 +123,8 @@ public ReindexDataStreamIndexTransportAction(
121123
TransportService transportService,
122124
ClusterService clusterService,
123125
ActionFilters actionFilters,
124-
Client client
126+
Client client,
127+
ProjectResolver projectResolver
125128
) {
126129
super(
127130
ReindexDataStreamIndexAction.NAME,
@@ -134,6 +137,7 @@ public ReindexDataStreamIndexTransportAction(
134137
this.clusterService = clusterService;
135138
this.client = client;
136139
this.transportService = transportService;
140+
this.projectResolver = projectResolver;
137141
}
138142

139143
@Override
@@ -142,19 +146,19 @@ protected void doExecute(
142146
ReindexDataStreamIndexAction.Request request,
143147
ActionListener<ReindexDataStreamIndexAction.Response> listener
144148
) {
145-
var project = clusterService.state().projectState();
149+
var projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
146150
var sourceIndexName = request.getSourceIndex();
147151
var destIndexName = generateDestIndexName(sourceIndexName);
148152
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
149-
IndexMetadata sourceIndex = project.metadata().index(sourceIndexName);
153+
IndexMetadata sourceIndex = projectMetadata.index(sourceIndexName);
150154
if (sourceIndex == null) {
151155
listener.onFailure(new ResourceNotFoundException("source index [{}] does not exist", sourceIndexName));
152156
return;
153157
}
154158

155159
Settings settingsBefore = sourceIndex.getSettings();
156160

157-
var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(project.metadata(), false, true);
161+
var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(projectMetadata, false, true);
158162
if (hasOldVersion.test(sourceIndex.getIndex()) == false) {
159163
logger.warn(
160164
"Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]",

0 commit comments

Comments
 (0)