Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
Expand All @@ -22,27 +24,31 @@

public class CancelReindexDataStreamTransportAction extends HandledTransportAction<Request, AcknowledgedResponse> {
private final PersistentTasksService persistentTasksService;
private final ProjectResolver projectResolver;

@Inject
public CancelReindexDataStreamTransportAction(
TransportService transportService,
ActionFilters actionFilters,
PersistentTasksService persistentTasksService
PersistentTasksService persistentTasksService,
ProjectResolver projectResolver
) {
super(CancelReindexDataStreamAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.persistentTasksService = persistentTasksService;
this.projectResolver = projectResolver;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<AcknowledgedResponse> listener) {
String index = request.getIndex();
ProjectId projectId = projectResolver.getProjectId();
String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index;
/*
* This removes the persistent task from the cluster state and results in the running task being cancelled (but not removed from
* the task manager). The running task is removed from the task manager in ReindexDataStreamTask::onCancelled, which is called as
* as result of this.
*/
persistentTasksService.sendRemoveRequest(persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() {
persistentTasksService.sendProjectRemoveRequest(projectId, persistentTaskId, TimeValue.MAX_VALUE, new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
listener.onResponse(AcknowledgedResponse.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

package org.elasticsearch.xpack.migrate.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -22,7 +20,9 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
Expand All @@ -36,20 +36,22 @@
import org.elasticsearch.transport.TransportService;

import java.util.HashMap;
import java.util.Objects;

public class CopyLifecycleIndexMetadataTransportAction extends TransportMasterNodeAction<
CopyLifecycleIndexMetadataAction.Request,
AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(CopyLifecycleIndexMetadataTransportAction.class);
private final ClusterStateTaskExecutor<UpdateIndexMetadataTask> executor;
private final MasterServiceTaskQueue<UpdateIndexMetadataTask> taskQueue;
private final ProjectResolver projectResolver;

@Inject
public CopyLifecycleIndexMetadataTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
CopyLifecycleIndexMetadataAction.NAME,
Expand All @@ -63,11 +65,14 @@ public CopyLifecycleIndexMetadataTransportAction(
);
this.executor = new SimpleBatchedAckListenerTaskExecutor<>() {
@Override
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateIndexMetadataTask task, ClusterState clusterState) {
return new Tuple<>(applyUpdate(clusterState, task), task);
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateIndexMetadataTask task, ClusterState state) {
var projectMetadata = state.metadata().getProject(task.projectId);
var updatedMetadata = applyUpdate(projectMetadata, task);
return new Tuple<>(ClusterState.builder(state).putProjectMetadata(updatedMetadata).build(), task);
}
};
this.taskQueue = clusterService.createTaskQueue("migrate-copy-index-metadata", Priority.NORMAL, this.executor);
this.projectResolver = projectResolver;
}

@Override
Expand All @@ -79,7 +84,13 @@ protected void masterOperation(
) {
taskQueue.submitTask(
"migrate-copy-index-metadata",
new UpdateIndexMetadataTask(request.sourceIndex(), request.destIndex(), request.ackTimeout(), listener),
new UpdateIndexMetadataTask(
projectResolver.getProjectId(),
request.sourceIndex(),
request.destIndex(),
request.ackTimeout(),
listener
),
request.masterNodeTimeout()
);
}
Expand All @@ -89,13 +100,15 @@ protected ClusterBlockException checkBlock(CopyLifecycleIndexMetadataAction.Requ
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataTask updateTask) {
private static ProjectMetadata applyUpdate(ProjectMetadata projectMetadata, UpdateIndexMetadataTask updateTask) {
assert projectMetadata != null && updateTask != null;
assert Objects.equals(updateTask.projectId, projectMetadata.id());

IndexMetadata sourceMetadata = state.metadata().getProject().index(updateTask.sourceIndex);
IndexMetadata sourceMetadata = projectMetadata.index(updateTask.sourceIndex);
if (sourceMetadata == null) {
throw new IndexNotFoundException(updateTask.sourceIndex);
}
IndexMetadata destMetadata = state.metadata().getProject().index(updateTask.destIndex);
IndexMetadata destMetadata = projectMetadata.index(updateTask.destIndex);
if (destMetadata == null) {
throw new IndexNotFoundException(updateTask.destIndex);
}
Expand All @@ -113,19 +126,26 @@ private static ClusterState applyUpdate(ClusterState state, UpdateIndexMetadataT
// creation date updates settings so must increment settings version
.settingsVersion(destMetadata.getSettingsVersion() + 1);

var indices = new HashMap<>(state.metadata().getProject().indices());
var indices = new HashMap<>(projectMetadata.indices());
indices.put(updateTask.destIndex, newDestMetadata.build());

Metadata newMetadata = Metadata.builder(state.metadata()).indices(indices).build();
return ClusterState.builder(state).metadata(newMetadata).build();
return ProjectMetadata.builder(projectMetadata).indices(indices).build();
}

static class UpdateIndexMetadataTask extends AckedBatchedClusterStateUpdateTask {
private final ProjectId projectId;
private final String sourceIndex;
private final String destIndex;

UpdateIndexMetadataTask(String sourceIndex, String destIndex, TimeValue ackTimeout, ActionListener<AcknowledgedResponse> listener) {
UpdateIndexMetadataTask(
ProjectId projectId,
String sourceIndex,
String destIndex,
TimeValue ackTimeout,
ActionListener<AcknowledgedResponse> listener
) {
super(ackTimeout, listener);
this.projectId = projectId;
this.sourceIndex = sourceIndex;
this.destIndex = destIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class CreateIndexFromSourceTransportAction extends HandledTransportAction
private final ClusterService clusterService;
private final Client client;
private final IndexScopedSettings indexScopedSettings;
private final ProjectResolver projectResolver;
private static final Set<String> INDEX_BLOCK_SETTINGS = Set.of(
IndexMetadata.SETTING_READ_ONLY,
IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE,
Expand All @@ -60,7 +62,8 @@ public CreateIndexFromSourceTransportAction(
ClusterService clusterService,
ActionFilters actionFilters,
Client client,
IndexScopedSettings indexScopedSettings
IndexScopedSettings indexScopedSettings,
ProjectResolver projectResolver
) {
super(
CreateIndexFromSourceAction.NAME,
Expand All @@ -73,12 +76,13 @@ public CreateIndexFromSourceTransportAction(
this.clusterService = clusterService;
this.client = client;
this.indexScopedSettings = indexScopedSettings;
this.projectResolver = projectResolver;
}

@Override
protected void doExecute(Task task, CreateIndexFromSourceAction.Request request, ActionListener<AcknowledgedResponse> listener) {

IndexMetadata sourceIndex = clusterService.state().getMetadata().getProject().index(request.sourceIndex());
IndexMetadata sourceIndex = projectResolver.getProjectMetadata(clusterService.state()).index(request.sourceIndex());

if (sourceIndex == null) {
listener.onFailure(new IndexNotFoundException(request.sourceIndex()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Strings;
Expand Down Expand Up @@ -48,26 +49,29 @@ public class GetMigrationReindexStatusTransportAction extends HandledTransportAc
private final ClusterService clusterService;
private final TransportService transportService;
private final Client client;
private final ProjectResolver projectResolver;

@Inject
public GetMigrationReindexStatusTransportAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Client client
Client client,
ProjectResolver projectResolver
) {
super(GetMigrationReindexStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.clusterService = clusterService;
this.transportService = transportService;
this.client = client;
this.projectResolver = projectResolver;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
String index = request.getIndex();
String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index;
PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = PersistentTasksCustomMetadata.getTaskWithId(
clusterService.state(),
projectResolver.getProjectMetadata(clusterService.state()),
persistentTaskId
);
if (persistentTask == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
private final ClusterService clusterService;
private final Client client;
private final TransportService transportService;
private final ProjectResolver projectResolver;
/*
* The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
* to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that
Expand All @@ -121,7 +123,8 @@ public ReindexDataStreamIndexTransportAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
Client client
Client client,
ProjectResolver projectResolver
) {
super(
ReindexDataStreamIndexAction.NAME,
Expand All @@ -134,6 +137,7 @@ public ReindexDataStreamIndexTransportAction(
this.clusterService = clusterService;
this.client = client;
this.transportService = transportService;
this.projectResolver = projectResolver;
}

@Override
Expand All @@ -142,19 +146,19 @@ protected void doExecute(
ReindexDataStreamIndexAction.Request request,
ActionListener<ReindexDataStreamIndexAction.Response> listener
) {
var project = clusterService.state().projectState();
var projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
var sourceIndexName = request.getSourceIndex();
var destIndexName = generateDestIndexName(sourceIndexName);
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
IndexMetadata sourceIndex = project.metadata().index(sourceIndexName);
IndexMetadata sourceIndex = projectMetadata.index(sourceIndexName);
if (sourceIndex == null) {
listener.onFailure(new ResourceNotFoundException("source index [{}] does not exist", sourceIndexName));
return;
}

Settings settingsBefore = sourceIndex.getSettings();

var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(project.metadata(), false, true);
var hasOldVersion = DeprecatedIndexPredicate.getReindexRequiredPredicate(projectMetadata, false, true);
if (hasOldVersion.test(sourceIndex.getIndex()) == false) {
logger.warn(
"Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]",
Expand Down
Loading