Skip to content

Commit 62c4540

Browse files
committed
Make migrate/10_reindex/* pass
1 parent b799cb7 commit 62c4540

File tree

4 files changed

+24
-12
lines changed

4 files changed

+24
-12
lines changed

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.cluster.metadata.DataStream;
18-
import org.elasticsearch.cluster.metadata.Metadata;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.core.TimeValue;
2121
import org.elasticsearch.injection.guice.Inject;
@@ -41,14 +41,16 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
4141
private final TransportService transportService;
4242
private final ClusterService clusterService;
4343
private final Client client;
44+
private final ProjectResolver projectResolver;
4445

4546
@Inject
4647
public ReindexDataStreamTransportAction(
4748
TransportService transportService,
4849
ActionFilters actionFilters,
4950
PersistentTasksService persistentTasksService,
5051
ClusterService clusterService,
51-
Client client
52+
Client client,
53+
ProjectResolver projectResolver
5254
) {
5355
super(
5456
ReindexDataStreamAction.NAME,
@@ -62,21 +64,22 @@ public ReindexDataStreamTransportAction(
6264
this.persistentTasksService = persistentTasksService;
6365
this.clusterService = clusterService;
6466
this.client = client;
67+
this.projectResolver = projectResolver;
6568
}
6669

6770
@Override
6871
protected void doExecute(Task task, ReindexDataStreamRequest request, ActionListener<AcknowledgedResponse> listener) {
6972
String sourceDataStreamName = request.getSourceDataStream();
70-
Metadata metadata = clusterService.state().metadata();
71-
DataStream dataStream = metadata.getProject().dataStreams().get(sourceDataStreamName);
73+
final var projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
74+
DataStream dataStream = projectMetadata.dataStreams().get(sourceDataStreamName);
7275
if (dataStream == null) {
7376
listener.onFailure(new ResourceNotFoundException("Data stream named [{}] does not exist", sourceDataStreamName));
7477
return;
7578
}
7679
int totalIndices = dataStream.getIndices().size();
7780
int totalIndicesToBeUpgraded = (int) dataStream.getIndices()
7881
.stream()
79-
.filter(getReindexRequiredPredicate(metadata.getProject(), false, dataStream.isSystem()))
82+
.filter(getReindexRequiredPredicate(projectMetadata, false, dataStream.isSystem()))
8083
.count();
8184
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
8285
sourceDataStreamName,
@@ -86,7 +89,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
8689
ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
8790
);
8891
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
89-
final var persistentTask = PersistentTasksCustomMetadata.getTaskWithId(clusterService.state(), persistentTaskId);
92+
final var persistentTask = PersistentTasksCustomMetadata.getTaskWithId(projectMetadata, persistentTaskId);
9093

9194
if (persistentTask == null) {
9295
startTask(listener, persistentTaskId, params);

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.elasticsearch.cluster.metadata.DataStream;
2929
import org.elasticsearch.cluster.metadata.DataStreamAction;
3030
import org.elasticsearch.cluster.metadata.IndexMetadata;
31+
import org.elasticsearch.cluster.metadata.ProjectId;
32+
import org.elasticsearch.cluster.project.ProjectResolver;
3133
import org.elasticsearch.cluster.service.ClusterService;
3234
import org.elasticsearch.common.settings.Setting;
3335
import org.elasticsearch.common.settings.Settings;
@@ -70,12 +72,14 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
7072
private final Client client;
7173
private final ClusterService clusterService;
7274
private final ThreadPool threadPool;
75+
private final ProjectResolver projectResolver;
7376

7477
public ReindexDataStreamPersistentTaskExecutor(Client client, ClusterService clusterService, String taskName, ThreadPool threadPool) {
7578
super(taskName, threadPool.generic());
7679
this.client = client;
7780
this.clusterService = clusterService;
7881
this.threadPool = threadPool;
82+
this.projectResolver = client.projectResolver();
7983
}
8084

8185
@Override
@@ -87,8 +91,10 @@ protected ReindexDataStreamTask createTask(
8791
PersistentTasksCustomMetadata.PersistentTask<ReindexDataStreamTaskParams> taskInProgress,
8892
Map<String, String> headers
8993
) {
94+
ProjectId projectId = projectResolver.getProjectId();
9095
ReindexDataStreamTaskParams params = taskInProgress.getParams();
9196
return new ReindexDataStreamTask(
97+
projectId,
9298
clusterService,
9399
params.startTime(),
94100
params.totalIndices(),
@@ -125,7 +131,7 @@ protected void nodeOperation(
125131
if (dataStreamInfos.size() == 1) {
126132
DataStream dataStream = dataStreamInfos.getFirst().getDataStream();
127133
boolean includeSystem = dataStream.isSystem();
128-
if (getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false, includeSystem).test(
134+
if (getReindexRequiredPredicate(projectResolver.getProjectMetadata(clusterService.state()), false, includeSystem).test(
129135
dataStream.getWriteIndex()
130136
)) {
131137
RolloverRequest rolloverRequest = new RolloverRequest(sourceDataStream, null);
@@ -174,7 +180,7 @@ private void reindexIndices(
174180
) {
175181
List<Index> indices = dataStream.getIndices();
176182
List<Index> indicesToBeReindexed = indices.stream()
177-
.filter(getReindexRequiredPredicate(clusterService.state().metadata().getProject(), false, dataStream.isSystem()))
183+
.filter(getReindexRequiredPredicate(projectResolver.getProjectMetadata(clusterService.state()), false, dataStream.isSystem()))
178184
.toList();
179185
final ReindexDataStreamPersistentTaskState updatedState;
180186
if (params.totalIndices() != totalIndicesInDataStream
@@ -337,7 +343,7 @@ private TimeValue updateCompletionTimeAndGetTimeToLive(
337343
@Nullable ReindexDataStreamPersistentTaskState state
338344
) {
339345
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = PersistentTasksCustomMetadata.getTaskWithId(
340-
clusterService.state(),
346+
projectResolver.getProjectMetadata(clusterService.state()),
341347
reindexDataStreamTask.getPersistentTaskId()
342348
);
343349
if (persistentTask == null) {

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.elasticsearch.cluster.metadata.ProjectId;
1011
import org.elasticsearch.cluster.service.ClusterService;
1112
import org.elasticsearch.common.util.concurrent.RunOnce;
1213
import org.elasticsearch.core.TimeValue;
@@ -26,6 +27,7 @@
2627

2728
public class ReindexDataStreamTask extends AllocatedPersistentTask {
2829
public static final String TASK_NAME = "reindex-data-stream";
30+
private final ProjectId projectId;
2931
private final ClusterService clusterService;
3032
private final long persistentTaskStartTime;
3133
private final int initialTotalIndices;
@@ -39,6 +41,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
3941

4042
@SuppressWarnings("this-escape")
4143
public ReindexDataStreamTask(
44+
ProjectId projectId,
4245
ClusterService clusterService,
4346
long persistentTaskStartTime,
4447
int initialTotalIndices,
@@ -51,6 +54,7 @@ public ReindexDataStreamTask(
5154
Map<String, String> headers
5255
) {
5356
super(id, type, action, description, parentTask, headers);
57+
this.projectId = projectId;
5458
this.clusterService = clusterService;
5559
this.persistentTaskStartTime = persistentTaskStartTime;
5660
this.initialTotalIndices = initialTotalIndices;
@@ -70,7 +74,7 @@ public ReindexDataStreamStatus getStatus() {
7074
int totalIndices = initialTotalIndices;
7175
int totalIndicesToBeUpgraded = initialTotalIndicesToBeUpgraded;
7276
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = PersistentTasksCustomMetadata.getTaskWithId(
73-
clusterService.state(),
77+
clusterService.state().projectState(projectId).metadata(),
7478
getPersistentTaskId()
7579
);
7680
boolean isComplete;
@@ -130,7 +134,7 @@ public void incrementInProgressIndicesCount(String index) {
130134

131135
private boolean isCompleteInClusterState() {
132136
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = PersistentTasksCustomMetadata.getTaskWithId(
133-
clusterService.state(),
137+
clusterService.state().projectState(projectId).metadata(),
134138
getPersistentTaskId()
135139
);
136140
if (persistentTask != null) {

x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ tasks.named("yamlRestTest").configure {
4949
'^ilm/60_operation_mode/*',
5050
'^ilm/80_health/*',
5151
'^logsdb/10_usage/*',
52-
'^migrate/10_reindex/*',
5352
'^migrate/20_reindex_status/*',
5453
'^migrate/30_create_from/*',
5554
'^migration/10_get_feature_upgrade_status/*',

0 commit comments

Comments
 (0)