diff --git a/modules/data-streams/src/main/java/module-info.java b/modules/data-streams/src/main/java/module-info.java index 2d49029c1023c..0aaf79db05c41 100644 --- a/modules/data-streams/src/main/java/module-info.java +++ b/modules/data-streams/src/main/java/module-info.java @@ -17,6 +17,7 @@ exports org.elasticsearch.datastreams.action to org.elasticsearch.server; exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server; exports org.elasticsearch.datastreams.lifecycle; + exports org.elasticsearch.datastreams.lifecycle.transitions.steps to org.elasticsearch.server; exports org.elasticsearch.datastreams.options.action to org.elasticsearch.server; provides org.elasticsearch.features.FeatureSpecification with org.elasticsearch.datastreams.DataStreamFeatures; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 7af612951a7f4..ff1422e60ba2e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -64,6 +64,8 @@ import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction; +import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction; +import org.elasticsearch.datastreams.lifecycle.transitions.steps.TransportMarkIndexForDLMForceMergeAction; import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction; import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction; import org.elasticsearch.datastreams.options.action.TransportDeleteDataStreamOptionsAction; @@ -260,6 +262,7 @@ public List getActions() { actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class)); actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class)); actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class)); + actions.add(new ActionHandler(MarkIndexForDLMForceMergeAction.TYPE, TransportMarkIndexForDLMForceMergeAction.class)); return actions; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 1ef274c72c4a2..a1790c6a000bb 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -36,9 +36,11 @@ import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.SimpleBatchedExecutor; @@ -74,6 +76,7 @@ import org.elasticsearch.datastreams.lifecycle.transitions.DlmActionContext; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; +import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; @@ -108,6 +111,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS; import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; +import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY; /** * This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a data stream lifecycle @@ -185,6 +189,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab private final SetOnce scheduler = new SetOnce<>(); private final MasterServiceTaskQueue forceMergeClusterStateUpdateTaskQueue; private final MasterServiceTaskQueue swapSourceWithDownsampleIndexQueue; + private final MasterServiceTaskQueue markIndexForDlmForceMergeQueue; private volatile ByteSizeValue targetMergePolicyFloorSegment; private volatile int targetMergePolicyFactor; /** @@ -253,6 +258,11 @@ public DataStreamLifecycleService( Priority.URGENT, // urgent priority as this deletes indices new DeleteSourceAndAddDownsampleIndexExecutor(allocationService) ); + this.markIndexForDlmForceMergeQueue = clusterService.createTaskQueue( + "dlm-mark-index-for-force-merge", + Priority.LOW, + new MarkIndexForDLMForceMergeExecutor() + ); this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher; this.actions = actions; } @@ -1761,6 +1771,116 @@ public void onFailure(Exception e) { } } + /** + * Task to update the cluster state with the force merge marker for DLM Tiers. + * Public for testing. + */ + public static class MarkIndexForDlmForceMergeTask extends AckedBatchedClusterStateUpdateTask implements ClusterStateTaskListener { + private final ActionListener listener; + private final ProjectId projectId; + private final String originalIndex; + private final String indexToBeForceMerged; + + MarkIndexForDlmForceMergeTask(ActionListener listener, MarkIndexForDLMForceMergeAction.Request request) { + super(TimeValue.THIRTY_SECONDS, listener); + this.listener = listener; + this.projectId = request.getProjectId(); + this.originalIndex = request.getOriginalIndex(); + this.indexToBeForceMerged = request.getIndexToBeForceMerged(); + } + + ClusterState execute(ClusterState currentState) { + ProjectMetadata projectMetadata = currentState.metadata().getProject(this.projectId); + if (projectMetadata == null) { + return currentState; + } + IndexMetadata originalIndexMetadata = projectMetadata.index(originalIndex); + IndexMetadata cloneIndexMetadata = projectMetadata.index(indexToBeForceMerged); + if (originalIndexMetadata == null) { + // If the source index doesn't exist, we can't mark it for force merge. Return the unchanged cluster state. + String errorMessage = Strings.format( + "DLM attempted to mark clone index [%s] for force merge for original index [%s] but index no longer exists", + indexToBeForceMerged, + originalIndex + ); + logger.debug(errorMessage); + throw new IndexNotFoundException(errorMessage); + } + + if (cloneIndexMetadata == null) { + // If the clone index doesn't exist, this is an unexpected error - the clone should have been created before this task + String errorMessage = Strings.format( + "clone index [%s] doesn't exist but was expected to exist when marking index [%s] for DLM force merge", + indexToBeForceMerged, + originalIndex + ); + logger.debug(errorMessage); + throw new IllegalStateException(errorMessage); + } + + Map existingCustomMetadata = originalIndexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); + Map customMetadata = new HashMap<>(); + if (existingCustomMetadata != null) { + if (indexToBeForceMerged.equals(existingCustomMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY))) { + // Index is already marked for force merge, no update needed + return currentState; + } + customMetadata.putAll(existingCustomMetadata); + } + customMetadata.put(DLM_INDEX_FOR_FORCE_MERGE_KEY, indexToBeForceMerged); + + IndexMetadata updatedOriginalIndexMetadata = new IndexMetadata.Builder(originalIndexMetadata).putCustom( + LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, + customMetadata + ).build(); + + final ProjectMetadata.Builder updatedProject = ProjectMetadata.builder(currentState.metadata().getProject(projectId)) + .put(updatedOriginalIndexMetadata, true); + return ClusterState.builder(currentState).putProjectMetadata(updatedProject).build(); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + + /** + * MarkIndexForDLMForceMergeExecutor for the MarkIndexForDlmForceMergeTask. + * Public for testing. + */ + public static class MarkIndexForDLMForceMergeExecutor implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(BatchExecutionContext batchExecutionContext) { + var state = batchExecutionContext.initialState(); + for (final var taskContext : batchExecutionContext.taskContexts()) { + try { + final MarkIndexForDlmForceMergeTask task = taskContext.getTask(); + state = task.execute(state); + taskContext.success(task); + } catch (Exception e) { + taskContext.onFailure(e); + } + } + return state; + } + } + + /** + * Marks the given index to be force merged for DLM by updating the cluster state with the name of the index to be force merged in the + * custom metadata of the source index. This method returns immediately, but the update to the cluster state happens asynchronously and + * the listener is notified on success or failure of the cluster state update. + * @param request the request + * @param listener the listener to be notified on success or failure of the cluster state update. + */ + public void markIndexForDlmForceMerge(MarkIndexForDLMForceMergeAction.Request request, ActionListener listener) { + markIndexForDlmForceMergeQueue.submitTask( + Strings.format("DLM marking index [%s] to be force merged for DLM", request.getIndexToBeForceMerged()), + new MarkIndexForDlmForceMergeTask(listener, request), + null + ); + } + /** * This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator. * It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java new file mode 100644 index 0000000000000..4ece7a8b1d254 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java @@ -0,0 +1,437 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle.transitions.steps; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; +import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; +import org.elasticsearch.action.admin.indices.shrink.ResizeType; +import org.elasticsearch.action.admin.indices.shrink.TransportResizeAction; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep; +import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; +import org.elasticsearch.index.Index; + +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.util.Optional; + +import static org.apache.logging.log4j.LogManager.getLogger; +import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; +import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY; + +/** + * This step clones the index into a new index with 0 replicas. + */ +public class CloneStep implements DlmStep { + + private static final TimeValue CLONE_TIMEOUT = TimeValue.timeValueHours(12); + private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); + private static final Logger logger = getLogger(CloneStep.class); + + @Override + public boolean stepCompleted(Index index, ProjectState projectState) { + ProjectMetadata projectMetadata = projectState.metadata(); + // the index can either be "cloned" or the original index if it had 0 replicas + return Optional.ofNullable(getIndexToBeForceMerged(index.getName(), projectState)) + .map(idx -> projectMetadata.indices().containsKey(idx) ? idx : null) + .map(idx -> projectState.routingTable().index(idx)) + .map(IndexRoutingTable::allPrimaryShardsActive) + .orElse(false); + } + + @Override + public void execute(DlmStepContext stepContext) { + Index index = stepContext.index(); + String indexName = index.getName(); + ProjectState projectState = stepContext.projectState(); + ProjectMetadata projectMetadata = projectState.metadata(); + IndexMetadata indexMetadata = projectMetadata.index(index); + + if (indexMetadata == null) { + logger.warn("Index [{}] not found in project metadata, skipping clone step", indexName); + return; + } + + if (indexMetadata.getNumberOfReplicas() == 0) { + logger.info( + "Skipping clone step for index [{}] as it already has 0 replicas and can be used for force merge directly", + indexName + ); + // mark the index to be force merged directly + markIndexToBeForceMerged(indexName, indexName, stepContext, ActionListener.wrap(resp -> { + logger.info("DLM successfully marked index [{}] to be force merged", indexName); + }, err -> { + logger.error(() -> Strings.format("DLM failed to mark index [%s] to be force merged", indexName), err); + stepContext.errorStore().recordError(stepContext.projectId(), indexName, err); + })); + return; + } + + String cloneIndex = getDLMCloneIndexName(indexName); + if (projectMetadata.indices().containsKey(cloneIndex)) { + // Clone index exists but step not completed - check if it's been stuck for too long and clean up if so + maybeCleanUpStuckCloneTask(cloneIndex, stepContext); + return; + } + + cloneIndex( + indexName, + cloneIndex, + ActionListener.wrap( + resp -> logger.info("DLM successfully initiated clone of index [{}] to index [{}]", indexName, cloneIndex), + err -> { + logger.error( + () -> Strings.format("DLM failed to initiate clone of index [%s] to index [%s]", indexName, cloneIndex), + err + ); + stepContext.errorStore().recordError(stepContext.projectId(), indexName, err); + } + ), + stepContext + ); + } + + @Override + public String stepName() { + return "Clone Index"; + } + + /** + * Checks if the clone index has been stuck for too long and if so, deletes it to allow a new clone attempt. + */ + private static void maybeCleanUpStuckCloneTask(String cloneIndex, DlmStepContext stepContext) { + String indexName = stepContext.indexName(); + ProjectMetadata projectMetadata = stepContext.projectState().metadata(); + IndexMetadata cloneIndexMetadata = projectMetadata.index(cloneIndex); + if (cloneIndexMetadata == null) { + logger.debug( + "Clone index [{}] for index [{}] not found in project metadata during stuck clone check, it may have been deleted", + cloneIndex, + indexName + ); + return; + } + + Long cloneCreationTime = getCloneIndexCreationTime(cloneIndex, cloneIndexMetadata, projectMetadata); + + if (cloneCreationTime == null) { + throw new IllegalStateException( + Strings.format("DLM unable to determine creation time for clone index [{}] of index [{}]", cloneIndex, indexName) + ); + } + + long currentTime = Clock.systemUTC().millis(); + long timeSinceCreation = currentTime - cloneCreationTime; + if (isCloneIndexStuck(cloneIndexMetadata, timeSinceCreation, stepContext)) { + // Clone has been stuck for > 12 hours, clean it up so a new clone can be attempted + TimeValue timeSinceCreationValue = TimeValue.timeValueMillis(timeSinceCreation); + logger.info( + "DLM cleaning up clone index [{}] for index [{}] as it has been in progress for [{}] (raw: [{}ms]), " + + "exceeding timeout of [{}] (raw: [{}ms])", + cloneIndex, + indexName, + timeSinceCreationValue.toHumanReadableString(2), + timeSinceCreation, + CLONE_TIMEOUT.toHumanReadableString(2), + CLONE_TIMEOUT.millis() + ); + deleteCloneIndexIfExists( + stepContext, + ActionListener.wrap( + resp -> logger.info("DLM successfully cleaned up clone index [{}] for index [{}]", cloneIndex, indexName), + err -> { + logger.error( + () -> Strings.format("DLM failed to clean up clone index [%s] for index [%s]", cloneIndex, indexName), + err + ); + stepContext.errorStore().recordError(stepContext.projectId(), indexName, err); + } + ) + ); + } else { + // Clone is still fresh, wait for it to complete + TimeValue timeSinceCreationValue = TimeValue.timeValueMillis(timeSinceCreation); + logger.debug( + "DLM clone index [{}] for index [{}] exists and has been in progress for [{}] (raw: [{}ms]), " + + "waiting for completion or timeout of [{}] (raw: [{}ms])", + cloneIndex, + indexName, + timeSinceCreationValue.toHumanReadableString(2), + timeSinceCreation, + CLONE_TIMEOUT.toHumanReadableString(2), + CLONE_TIMEOUT.millis() + ); + } + } + + private static boolean isCloneIndexStuck(IndexMetadata cloneIndexMetadata, long timeSinceCreation, DlmStepContext stepContext) { + ResizeRequest cloneRequest = formCloneRequest(stepContext.indexName(), cloneIndexMetadata.getIndex().getName()); + return stepContext.isRequestInProgress(cloneRequest) && timeSinceCreation > CLONE_TIMEOUT.millis(); + } + + private static class CloneIndexResizeActionListener implements ActionListener { + private final String originalIndex; + private final String cloneIndex; + private final ActionListener listener; + private final DlmStepContext stepContext; + + private CloneIndexResizeActionListener( + String originalIndex, + String cloneIndex, + ActionListener listener, + DlmStepContext stepContext + ) { + this.originalIndex = originalIndex; + this.cloneIndex = cloneIndex; + this.listener = listener; + this.stepContext = stepContext; + } + + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + if (createIndexResponse.isAcknowledged() == false) { + onFailure( + new ElasticsearchException( + Strings.format("DLM failed to acknowledge clone of index [%s] to index [%s]", originalIndex, cloneIndex) + ) + ); + return; + } + logger.debug("DLM successfully cloned index [{}] to index [{}]", originalIndex, cloneIndex); + // on success, write the cloned index name to the custom metadata of the index metadata of original index + markIndexToBeForceMerged(originalIndex, cloneIndex, stepContext, listener.delegateFailure((l, v) -> { + logger.info("DLM successfully marked index [{}] to be force merged for source index [{}]", cloneIndex, originalIndex); + l.onResponse(null); + })); + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> Strings.format("DLM failed to clone index [%s] to index [%s]", originalIndex, cloneIndex), e); + stepContext.errorStore().recordError(stepContext.projectId(), originalIndex, e); + deleteCloneIndexIfExists(stepContext, listener.delegateFailure((l, v) -> { + logger.info( + "DLM successfully deleted clone index [{}] after failed attempt to clone index [{}]", + cloneIndex, + originalIndex + ); + l.onFailure(e); + })); + } + } + + private void cloneIndex(String originalIndex, String cloneIndex, ActionListener listener, DlmStepContext stepContext) { + ResizeRequest cloneIndexRequest = formCloneRequest(originalIndex, cloneIndex); + stepContext.executeDeduplicatedRequest( + TransportResizeAction.TYPE.name(), + cloneIndexRequest, + Strings.format("DLM service encountered an error when trying to clone index [%s]", originalIndex), + (req, unused) -> cloneIndex(stepContext.projectId(), cloneIndexRequest, listener, stepContext) + ); + } + + private void cloneIndex(ProjectId projectId, ResizeRequest cloneRequest, ActionListener listener, DlmStepContext stepContext) { + assert cloneRequest.indices() != null && cloneRequest.indices().length == 1 : "DLM should clone one index at a time"; + String originalIndex = cloneRequest.getSourceIndex(); + String cloneIndex = cloneRequest.getTargetIndexRequest().index(); + logger.trace("DLM issuing request to clone index [{}] to index [{}]", originalIndex, cloneIndex); + CloneIndexResizeActionListener responseListener = new CloneIndexResizeActionListener( + originalIndex, + cloneIndex, + listener, + stepContext + ); + stepContext.client().projectClient(projectId).execute(TransportResizeAction.TYPE, cloneRequest, responseListener); + } + + /** + * Updates the custom metadata of the index metadata of the source index to mark the target index as that to be force merged by DLM. + * This method performs the update asynchronously using a transport action. + */ + private static void markIndexToBeForceMerged( + String originalIndex, + String indexToBeForceMerged, + DlmStepContext stepContext, + ActionListener listener + ) { + MarkIndexForDLMForceMergeAction.Request markIndexForForceMergeRequest = new MarkIndexForDLMForceMergeAction.Request( + stepContext.projectId(), + originalIndex, + indexToBeForceMerged + ); + stepContext.executeDeduplicatedRequest( + MarkIndexForDLMForceMergeAction.TYPE.name(), + markIndexForForceMergeRequest, + Strings.format( + "DLM service encountered an error when trying to mark index [%s] to be force merged for source index [%s]", + indexToBeForceMerged, + originalIndex + ), + (req, unused) -> markIndexToBeForceMerged(markIndexForForceMergeRequest, stepContext, listener) + ); + } + + private static void markIndexToBeForceMerged( + MarkIndexForDLMForceMergeAction.Request request, + DlmStepContext stepContext, + ActionListener listener + ) { + logger.debug( + "DLM marking index [{}] to be force merged for source index [{}]", + request.getIndexToBeForceMerged(), + request.getOriginalIndex() + ); + stepContext.client() + .projectClient(stepContext.projectId()) + .execute(MarkIndexForDLMForceMergeAction.TYPE, request, ActionListener.wrap(resp -> { + if (resp.isAcknowledged()) { + listener.onResponse(null); + } else { + listener.onFailure( + new ElasticsearchException( + Strings.format( + "DLM failed to acknowledge marking index [%s] to be force merged for source index [%s]", + request.getIndexToBeForceMerged(), + request.getOriginalIndex() + ) + ) + ); + } + }, listener::onFailure)); + } + + private static void deleteCloneIndexIfExists(DlmStepContext stepContext, ActionListener listener) { + String cloneIndex = getDLMCloneIndexName(stepContext.indexName()); + if (stepContext.projectState().metadata().indices().containsKey(cloneIndex) == false) { + logger.debug("Clone index [{}] does not exist, no need to delete", cloneIndex); + listener.onResponse(null); + return; + } + logger.debug("Attempting to delete index [{}]", cloneIndex); + + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(cloneIndex).indicesOptions(IGNORE_MISSING_OPTIONS) + .masterNodeTimeout(TimeValue.MAX_VALUE); + stepContext.executeDeduplicatedRequest( + TransportDeleteIndexAction.TYPE.name(), + deleteIndexRequest, + Strings.format("DLM service encountered an error trying to delete clone index [%s]", cloneIndex), + (req, unused) -> deleteCloneIndex(deleteIndexRequest, stepContext, listener) + ); + } + + private static void deleteCloneIndex(DeleteIndexRequest deleteIndexRequest, DlmStepContext stepContext, ActionListener listener) { + String cloneIndex = deleteIndexRequest.indices()[0]; + logger.debug("DLM issuing request to delete index [{}]", cloneIndex); + stepContext.client() + .projectClient(stepContext.projectId()) + .admin() + .indices() + .delete(deleteIndexRequest, ActionListener.wrap(resp -> { + if (resp.isAcknowledged()) { + logger.debug("DLM successfully deleted clone index [{}]", cloneIndex); + listener.onResponse(null); + } else { + listener.onFailure( + new ElasticsearchException(Strings.format("Failed to acknowledge delete of index [%s]", cloneIndex)) + ); + } + }, err -> { + logger.error(() -> Strings.format("DLM failed to delete clone index [%s]", cloneIndex), err); + listener.onFailure(err); + })); + } + + /** + * Forms a resize request to clone the source index into a new index with 0 replicas. + * @param originalIndex the index to be cloned + * @param cloneIndex the name of the new index to clone into + * @return the resize request to clone the source index into a new index with 0 replicas + */ + public static ResizeRequest formCloneRequest(String originalIndex, String cloneIndex) { + CreateIndexRequest createReq = new CreateIndexRequest(cloneIndex); + createReq.waitForActiveShards(ActiveShardCount.ALL); + ResizeRequest resizeReq = new ResizeRequest( + MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + ResizeType.CLONE, + originalIndex, + cloneIndex + ); + resizeReq.setTargetIndex(createReq); + resizeReq.setTargetIndexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); + return resizeReq; + } + + /** + * Returns the name of index to be force merged by DLM from the custom metadata of the index metadata of the source index. + * If no such index has been marked in the custom metadata, returns null. + */ + @Nullable + private static String getIndexToBeForceMerged(String originalIndex, ProjectState projectState) { + return Optional.ofNullable(projectState.metadata().index(originalIndex)) + .map(indexMetadata -> indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY)) + .map(customMetadata -> customMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY)) + .orElse(null); + } + + /** + * Gets a unique name deterministically for the clone index based on the original index name. + * The generated name format is "dlm-clone-{hash}" where the hash is a full 64-character + * SHA-256 hex digest of the original name to ensure uniqueness. + * + * @param originalName the original index name + * @return a deterministic unique name for the clone index based on the original index name + */ + public static String getDLMCloneIndexName(String originalName) { + String hash = MessageDigests.toHexString(MessageDigests.sha256().digest(originalName.getBytes(StandardCharsets.UTF_8))); + String prefix = "dlm-clone-"; + + return prefix + hash; + } + + /** + * Gets the creation time of a clone index. + * + * @param cloneIndex the name of the clone index + * @param cloneIndexMetadata the metadata of the clone index + * @param projectMetadata the project metadata containing data stream information + * @return the creation time in milliseconds, or null if it cannot be determined + */ + @Nullable + protected static Long getCloneIndexCreationTime(String cloneIndex, IndexMetadata cloneIndexMetadata, ProjectMetadata projectMetadata) { + return Optional.ofNullable(projectMetadata.getIndicesLookup()) + .map(indicesLookup -> indicesLookup.get(cloneIndex)) + .map(IndexAbstraction::getParentDataStream) + .map(dataStream -> dataStream.getGenerationLifecycleDate(cloneIndexMetadata)) + .map(TimeValue::millis) + .orElse(cloneIndexMetadata.getCreationDate()); + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java new file mode 100644 index 0000000000000..91a849d45c767 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle.transitions.steps; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +/** + * Action to mark an index to be force merged by updating its custom metadata. + */ +public class MarkIndexForDLMForceMergeAction { + + public static final ActionType TYPE = new ActionType<>("indices:admin/dlm/mark_index_for_force_merge"); + public static final String DLM_INDEX_FOR_FORCE_MERGE_KEY = "dlm_index_for_force_merge"; + + /** + * Request to mark an index to be force merged. + */ + public static class Request extends MasterNodeRequest { + private final ProjectId projectId; + private final String originalIndex; + private final String indexToBeForceMerged; + + /** + * Constructor for the request. + * @param projectId the project id of the index + * @param originalIndex the original index that is being transitioned through DLM lifecycle + * @param indexToBeForceMerged the index that needs to be force merged + */ + public Request(ProjectId projectId, String originalIndex, String indexToBeForceMerged) { + super(INFINITE_MASTER_NODE_TIMEOUT); + if (projectId == null) { + throw new IllegalArgumentException("projectId must not be null or empty"); + } + if (Strings.isNullOrEmpty(originalIndex)) { + throw new IllegalArgumentException("originalIndex must not be null or empty"); + } + if (Strings.isNullOrEmpty(indexToBeForceMerged)) { + throw new IllegalArgumentException("indexToBeForceMerged must not be null or empty"); + } + this.projectId = projectId; + this.originalIndex = originalIndex; + this.indexToBeForceMerged = indexToBeForceMerged; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.projectId = ProjectId.readFrom(in); + this.originalIndex = in.readString(); + this.indexToBeForceMerged = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + projectId.writeTo(out); + out.writeString(originalIndex); + out.writeString(indexToBeForceMerged); + } + + public ProjectId getProjectId() { + return projectId; + } + + public String getOriginalIndex() { + return originalIndex; + } + + public String getIndexToBeForceMerged() { + return indexToBeForceMerged; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public int hashCode() { + return Objects.hash(projectId, originalIndex, indexToBeForceMerged); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Request request = (Request) o; + return Objects.equals(projectId, request.projectId) + && Objects.equals(originalIndex, request.originalIndex) + && Objects.equals(indexToBeForceMerged, request.indexToBeForceMerged); + } + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/TransportMarkIndexForDLMForceMergeAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/TransportMarkIndexForDLMForceMergeAction.java new file mode 100644 index 0000000000000..00ed987189fc0 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/TransportMarkIndexForDLMForceMergeAction.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle.transitions.steps; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * Transport action to mark an index to be force merged by updating its custom metadata. + */ +public class TransportMarkIndexForDLMForceMergeAction extends AcknowledgedTransportMasterNodeAction< + MarkIndexForDLMForceMergeAction.Request> { + + private final DataStreamLifecycleService dataStreamLifecycleService; + + @Inject + public TransportMarkIndexForDLMForceMergeAction( + TransportService transportService, + ClusterService clusterService, + DataStreamLifecycleService dataStreamLifecycleService, + ThreadPool threadPool, + ActionFilters actionFilters + ) { + super( + MarkIndexForDLMForceMergeAction.TYPE.name(), + transportService, + clusterService, + threadPool, + actionFilters, + MarkIndexForDLMForceMergeAction.Request::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.dataStreamLifecycleService = dataStreamLifecycleService; + } + + @Override + protected void masterOperation( + Task task, + MarkIndexForDLMForceMergeAction.Request request, + ClusterState state, + ActionListener listener + ) { + dataStreamLifecycleService.markIndexForDlmForceMerge(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(MarkIndexForDLMForceMergeAction.Request request, ClusterState state) { + ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (globalBlock != null) { + return globalBlock; + } + return state.blocks().indexBlockedException(request.getProjectId(), ClusterBlockLevel.METADATA_WRITE, request.getOriginalIndex()); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndexForDLMForceMergeActionTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndexForDLMForceMergeActionTests.java new file mode 100644 index 0000000000000..d4435a9697cad --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/MarkIndexForDLMForceMergeActionTests.java @@ -0,0 +1,264 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; +import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class MarkIndexForDLMForceMergeActionTests extends ESTestCase { + + /** + * Helper method to create IndexMetadata with standard settings for tests + */ + private IndexMetadata createIndexMetadata(String indexName) { + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_INDEX_UUID, randomAlphaOfLength(10)) + ) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + } + + /** + * Helper method to create IndexMetadata with custom metadata + */ + private IndexMetadata createIndexMetadata(String indexName, Map customMetadata) { + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_INDEX_UUID, randomAlphaOfLength(10)) + ) + .numberOfShards(1) + .numberOfReplicas(1) + .putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, customMetadata) + .build(); + } + + /** + * Helper method to create ClusterState with source and target indices + */ + private ClusterState createClusterState(ProjectId projectId, IndexMetadata sourceIndex, IndexMetadata targetIndex) { + return ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId).put(sourceIndex, false).put(targetIndex, false)) + .build(); + } + + public void testRequestValidation() { + ProjectId projectId = randomProjectIdOrDefault(); + String sourceIndex = "test-index"; + String targetIndex = "clone-index"; + + // Valid request + MarkIndexForDLMForceMergeAction.Request validRequest = new MarkIndexForDLMForceMergeAction.Request( + projectId, + sourceIndex, + targetIndex + ); + assertThat(validRequest.validate(), is(nullValue())); + + // Null project ID + IllegalArgumentException e1 = expectThrows( + IllegalArgumentException.class, + () -> new MarkIndexForDLMForceMergeAction.Request(null, sourceIndex, targetIndex) + ); + assertThat(e1.getMessage(), containsString("projectId must not be null or empty")); + + // Null source index + IllegalArgumentException e2 = expectThrows( + IllegalArgumentException.class, + () -> new MarkIndexForDLMForceMergeAction.Request(projectId, null, targetIndex) + ); + assertThat(e2.getMessage(), containsString("originalIndex must not be null or empty")); + + // Empty source index + IllegalArgumentException e3 = expectThrows( + IllegalArgumentException.class, + () -> new MarkIndexForDLMForceMergeAction.Request(projectId, "", targetIndex) + ); + assertThat(e3.getMessage(), containsString("originalIndex must not be null or empty")); + + // Null target index + IllegalArgumentException e4 = expectThrows( + IllegalArgumentException.class, + () -> new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, null) + ); + assertThat(e4.getMessage(), containsString("indexToBeForceMerged must not be null or empty")); + + // Empty target index + IllegalArgumentException e5 = expectThrows( + IllegalArgumentException.class, + () -> new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, "") + ); + assertThat(e5.getMessage(), containsString("indexToBeForceMerged must not be null or empty")); + } + + public void testUpdateTaskWithMissingIndex() { + // Test scenario: source index doesn't exist in cluster state + ProjectId projectId = randomProjectIdOrDefault(); + String sourceIndex = "non-existent-index"; + String targetIndex = "clone-index"; + + ClusterState initialState = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId)) + .build(); + + AtomicReference responseRef = new AtomicReference<>(); + DataStreamLifecycleService.MarkIndexForDlmForceMergeTask task = new DataStreamLifecycleService.MarkIndexForDlmForceMergeTask( + ActionListener.wrap(responseRef::set, e -> fail("Should not fail when index is missing")), + new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, targetIndex) + ); + + // When source index doesn't exist, an IndexNotFoundException should be thrown + expectThrows(org.elasticsearch.index.IndexNotFoundException.class, () -> task.execute(initialState)); + } + + public void testUpdateTaskWithExistingIndex() { + // Test scenario: mark an index that exists but has no custom metadata yet + ProjectId projectId = randomProjectIdOrDefault(); + String sourceIndex = "test-index"; + String targetIndex = "clone-index"; + + IndexMetadata indexMetadata = createIndexMetadata(sourceIndex); + IndexMetadata cloneIndexMetadata = createIndexMetadata(targetIndex); + ClusterState initialState = createClusterState(projectId, indexMetadata, cloneIndexMetadata); + + AtomicReference responseRef = new AtomicReference<>(); + DataStreamLifecycleService.MarkIndexForDlmForceMergeTask task = new DataStreamLifecycleService.MarkIndexForDlmForceMergeTask( + ActionListener.wrap(responseRef::set, e -> fail("Should not fail: " + e.getMessage())), + new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, targetIndex) + ); + + ClusterState resultState = task.execute(initialState); + + // Verify the custom metadata was added + IndexMetadata updatedIndexMetadata = resultState.metadata().getProject(projectId).index(sourceIndex); + assertThat(updatedIndexMetadata, is(notNullValue())); + Map customMetadata = updatedIndexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); + assertThat(customMetadata, is(notNullValue())); + assertThat(customMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY), equalTo(targetIndex)); + } + + public void testUpdateTaskMergesExistingCustomMetadata() { + // Test scenario: index already has some custom metadata, we need to preserve it + ProjectId projectId = randomProjectIdOrDefault(); + String sourceIndex = "test-index"; + String targetIndex = "clone-index"; + + Map existingCustomMetadata = new HashMap<>(); + existingCustomMetadata.put("other_key", "other_value"); + existingCustomMetadata.put("another_key", "another_value"); + + IndexMetadata indexMetadata = createIndexMetadata(sourceIndex, existingCustomMetadata); + IndexMetadata cloneIndexMetadata = createIndexMetadata(targetIndex); + ClusterState initialState = createClusterState(projectId, indexMetadata, cloneIndexMetadata); + + AtomicReference responseRef = new AtomicReference<>(); + DataStreamLifecycleService.MarkIndexForDlmForceMergeTask task = new DataStreamLifecycleService.MarkIndexForDlmForceMergeTask( + ActionListener.wrap(responseRef::set, e -> fail("Should not fail: " + e.getMessage())), + new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, targetIndex) + ); + + ClusterState resultState = task.execute(initialState); + + // Verify the custom metadata was merged (not replaced) + IndexMetadata updatedIndexMetadata = resultState.metadata().getProject(projectId).index(sourceIndex); + assertThat(updatedIndexMetadata, is(notNullValue())); + Map customMetadata = updatedIndexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); + assertThat(customMetadata, is(notNullValue())); + assertThat(customMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY), equalTo(targetIndex)); + assertThat(customMetadata.get("other_key"), equalTo("other_value")); + assertThat(customMetadata.get("another_key"), equalTo("another_value")); + assertThat(customMetadata.size(), equalTo(3)); + } + + public void testUpdateTaskIsIdempotent() { + // Test scenario: running the update twice with the same target index should not modify cluster state on second run + ProjectId projectId = randomProjectIdOrDefault(); + String sourceIndex = "test-index"; + String targetIndex = "clone-index"; + + Map existingCustomMetadata = new HashMap<>(); + existingCustomMetadata.put(DLM_INDEX_FOR_FORCE_MERGE_KEY, targetIndex); + existingCustomMetadata.put("other_key", "other_value"); + + IndexMetadata indexMetadata = createIndexMetadata(sourceIndex, existingCustomMetadata); + IndexMetadata cloneIndexMetadata = createIndexMetadata(targetIndex); + ClusterState initialState = createClusterState(projectId, indexMetadata, cloneIndexMetadata); + + AtomicReference responseRef = new AtomicReference<>(); + DataStreamLifecycleService.MarkIndexForDlmForceMergeTask task = new DataStreamLifecycleService.MarkIndexForDlmForceMergeTask( + ActionListener.wrap(responseRef::set, e -> fail("Should not fail: " + e.getMessage())), + new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, targetIndex) + ); + + ClusterState resultState = task.execute(initialState); + + // Since the key already matches, cluster state should remain unchanged + assertThat(resultState, sameInstance(initialState)); + } + + public void testUpdateTaskOverwritesExistingKey() { + // Test scenario: index already has DLM_INDEX_FOR_FORCE_MERGE_KEY but with a different value + ProjectId projectId = randomProjectIdOrDefault(); + String sourceIndex = "test-index"; + String oldTargetIndex = "old-clone-index"; + String newTargetIndex = "new-clone-index"; + + Map existingCustomMetadata = new HashMap<>(); + existingCustomMetadata.put(DLM_INDEX_FOR_FORCE_MERGE_KEY, oldTargetIndex); + existingCustomMetadata.put("other_key", "other_value"); + + IndexMetadata indexMetadata = createIndexMetadata(sourceIndex, existingCustomMetadata); + IndexMetadata newCloneIndexMetadata = createIndexMetadata(newTargetIndex); + ClusterState initialState = createClusterState(projectId, indexMetadata, newCloneIndexMetadata); + + AtomicReference responseRef = new AtomicReference<>(); + DataStreamLifecycleService.MarkIndexForDlmForceMergeTask task = new DataStreamLifecycleService.MarkIndexForDlmForceMergeTask( + ActionListener.wrap(responseRef::set, e -> fail("Should not fail: " + e.getMessage())), + new MarkIndexForDLMForceMergeAction.Request(projectId, sourceIndex, newTargetIndex) + ); + + ClusterState resultState = task.execute(initialState); + + // Verify the key was updated to the new value + IndexMetadata updatedIndexMetadata = resultState.metadata().getProject(projectId).index(sourceIndex); + assertThat(updatedIndexMetadata, is(notNullValue())); + Map customMetadata = updatedIndexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); + assertThat(customMetadata, is(notNullValue())); + assertThat(customMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY), equalTo(newTargetIndex)); + assertThat(customMetadata.get("other_key"), equalTo("other_value")); + } +} diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java new file mode 100644 index 0000000000000..6a22aa5635c5b --- /dev/null +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java @@ -0,0 +1,533 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle.transitions.steps; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ResultDeduplicator; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.TestProjectResolvers; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; +import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.junit.After; +import org.junit.Before; + +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; +import static org.elasticsearch.datastreams.lifecycle.transitions.steps.CloneStep.formCloneRequest; +import static org.elasticsearch.datastreams.lifecycle.transitions.steps.CloneStep.getDLMCloneIndexName; +import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class CloneStepTests extends ESTestCase { + + private CloneStep cloneStep; + private ProjectId projectId; + private String indexName; + private Index index; + private ThreadPool threadPool; + private Client client; + private DataStreamLifecycleErrorStore errorStore; + private ResultDeduplicator, Void> deduplicator; + private AtomicReference> capturedCloneListener; + private AtomicReference> capturedDeleteListener; + private AtomicReference capturedResizeRequest; + private AtomicReference capturedDeleteRequest; + private AtomicReference capturedMarkRequest; + + @Before + public void setup() { + threadPool = new TestThreadPool(getTestName()); + cloneStep = new CloneStep(); + projectId = randomProjectIdOrDefault(); + indexName = randomAlphaOfLength(10); + index = new Index(indexName, randomAlphaOfLength(10)); + errorStore = new DataStreamLifecycleErrorStore(System::currentTimeMillis); + deduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); + capturedCloneListener = new AtomicReference<>(); + capturedDeleteListener = new AtomicReference<>(); + capturedResizeRequest = new AtomicReference<>(); + capturedDeleteRequest = new AtomicReference<>(); + capturedMarkRequest = new AtomicReference<>(); + + client = new NoOpClient(threadPool, TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext())) { + @Override + @SuppressWarnings("unchecked") + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof ResizeRequest resizeRequest) { + capturedResizeRequest.set(resizeRequest); + capturedCloneListener.set((ActionListener) listener); + } else if (request instanceof DeleteIndexRequest deleteIndexRequest) { + capturedDeleteRequest.set(deleteIndexRequest); + capturedDeleteListener.set((ActionListener) listener); + } else if (request instanceof MarkIndexForDLMForceMergeAction.Request markRequest) { + capturedMarkRequest.set(markRequest); + } + } + }; + } + + @After + public void cleanup() { + terminate(threadPool); + } + + public void testStepName() { + assertThat(cloneStep.stepName(), equalTo("Clone Index")); + } + + public void testStepNotCompletedWhenNoCloneIndexExists() { + ProjectState projectState = projectStateBuilder().build(); + assertFalse(cloneStep.stepCompleted(index, projectState)); + } + + public void testStepNotCompletedWhenCloneNotMarkedInMetadata() { + ProjectState projectState = projectStateBuilder().withClone().build(); + assertFalse(cloneStep.stepCompleted(index, projectState)); + } + + public void testStepCompletedWhenCloneExistsAndMarkedInMetadata() { + String cloneIndexName = getDLMCloneIndexName(indexName); + Map customMetadata = Map.of(DLM_INDEX_FOR_FORCE_MERGE_KEY, cloneIndexName); + ProjectState projectState = projectStateBuilder().withClone().withCustomMetadata(customMetadata).withRouting().build(); + assertTrue(cloneStep.stepCompleted(index, projectState)); + } + + public void testStepNotCompletedWhenShardsNotActive() { + String cloneIndexName = getDLMCloneIndexName(indexName); + Map customMetadata = Map.of(DLM_INDEX_FOR_FORCE_MERGE_KEY, cloneIndexName); + ProjectState projectState = projectStateBuilder().withClone().withCustomMetadata(customMetadata).withRouting(false).build(); + assertFalse(cloneStep.stepCompleted(index, projectState)); + } + + public void testStepCompletedWhenOriginalIndexMarkedWithZeroReplicas() { + Map customMetadata = Map.of(DLM_INDEX_FOR_FORCE_MERGE_KEY, indexName); + ProjectState projectState = projectStateBuilder().withReplicas(0).withCustomMetadata(customMetadata).withRouting().build(); + assertTrue(cloneStep.stepCompleted(index, projectState)); + } + + public void testExecuteSkipsCloneWhenIndexHasZeroReplicas() { + ProjectState projectState = projectStateBuilder().withReplicas(0).build(); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + assertThat(capturedResizeRequest.get(), is(nullValue())); + + assertThat(capturedMarkRequest.get(), is(notNullValue())); + assertThat(capturedMarkRequest.get().getOriginalIndex(), equalTo(indexName)); + assertThat(capturedMarkRequest.get().getIndexToBeForceMerged(), equalTo(indexName)); + assertThat(capturedMarkRequest.get().getProjectId(), equalTo(projectId)); + } + + public void testExecuteDeletesExistingCloneAndRetriesClone() { + // Create a clone index that was created more than 12 hours ago (stuck) + String cloneIndexName = getDLMCloneIndexName(indexName); + ProjectState projectState = setupStuckCloneScenario(13); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + // Should issue delete request for stuck clone + assertThat(capturedDeleteRequest.get(), is(notNullValue())); + assertThat(capturedDeleteRequest.get().indices()[0], equalTo(cloneIndexName)); + } + + public void testExecuteCreatesCloneWithCorrectSettings() { + ProjectState projectState = projectStateBuilder().build(); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + assertThat(capturedResizeRequest.get(), is(notNullValue())); + assertThat(capturedResizeRequest.get().getSourceIndex(), equalTo(indexName)); + assertThat(capturedResizeRequest.get().getTargetIndexRequest().index(), containsString("dlm-clone-")); + assertThat(capturedResizeRequest.get().getTargetIndexRequest().settings().get("index.number_of_replicas"), equalTo("0")); + } + + public void testExecuteWithSuccessfulCloneResponse() { + ProjectState projectState = projectStateBuilder().build(); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + assertThat("clone listener should be captured", capturedCloneListener.get(), is(notNullValue())); + + String cloneIndexName = getDLMCloneIndexName(indexName); + CreateIndexResponse response = new CreateIndexResponse(true, true, cloneIndexName); + capturedCloneListener.get().onResponse(response); + + assertThat(capturedMarkRequest.get(), is(notNullValue())); + assertThat(capturedMarkRequest.get().getOriginalIndex(), equalTo(indexName)); + assertThat(capturedMarkRequest.get().getIndexToBeForceMerged(), equalTo(cloneIndexName)); + assertThat(capturedMarkRequest.get().getProjectId(), equalTo(projectId)); + } + + public void testExecuteWithFailedCloneResponse() { + ProjectState projectState = projectStateBuilder().build(); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + ElasticsearchException exception = new ElasticsearchException("clone failed"); + capturedCloneListener.get().onFailure(exception); + + // Should NOT attempt to delete the clone index since it was never created in metadata + assertThat(capturedDeleteRequest.get(), is(nullValue())); + } + + public void testGetDLMCloneIndexName() { + String name = "test-index"; + String cloneName = getDLMCloneIndexName(name); + assertThat("Clone name should be deterministic", cloneName, equalTo(getDLMCloneIndexName(name))); + assertThat("Clone name should contain prefix", cloneName, containsString("dlm-clone-")); + int shortNameLength = cloneName.getBytes(StandardCharsets.UTF_8).length; + assertThat("Clone name should not exceed 255 bytes", shortNameLength <= 255, is(true)); + + // Test that different names produce different clone names (due to different hashes) + String name1 = "index-1"; + String name2 = "index-2"; + assertThat( + "Different names should produce different clone names", + getDLMCloneIndexName(name1), + not(equalTo(getDLMCloneIndexName(name2))) + ); + } + + public void testDeleteCloneSuccessfully() { + ProjectState projectState = setupStuckCloneScenario(13); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + // Respond to delete request successfully + AcknowledgedResponse deleteResponse = AcknowledgedResponse.of(true); + capturedDeleteListener.get().onResponse(deleteResponse); + } + + public void testDeleteCloneWithFailure() { + ProjectState projectState = setupStuckCloneScenario(13); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + // Respond to delete request with failure + ElasticsearchException exception = new ElasticsearchException("delete failed"); + capturedDeleteListener.get().onFailure(exception); + } + + public void testExecuteWaitsWhenCloneIsInProgressAndNotTimedOut() { + // Create a clone index that was created less than 12 hours ago + ProjectState projectState = setupStuckCloneScenario(6); + DlmStepContext stepContext = createStepContext(projectState); + cloneStep.execute(stepContext); + + // Should NOT issue a delete request since it's still fresh + assertThat("Should not delete clone that is still within timeout", capturedDeleteRequest.get(), is(nullValue())); + // Should NOT issue a new clone request + assertThat("Should not create new clone request while one is in progress", capturedResizeRequest.get(), is(nullValue())); + } + + public void testExecuteDeletesCloneWhenStuckForOver12Hours() { + // Create a clone index that was created more than 12 hours ago + String cloneIndexName = getDLMCloneIndexName(indexName); + ProjectState projectState = setupStuckCloneScenario(13); + DlmStepContext stepContext = createStepContext(projectState); + cloneStep.execute(stepContext); + + // Should issue delete request for stuck clone + assertThat("Should delete clone that exceeded timeout", capturedDeleteRequest.get(), is(notNullValue())); + assertThat(capturedDeleteRequest.get().indices()[0], equalTo(cloneIndexName)); + } + + public void testExecuteWaitsWhenCloneExistsButNotInDeduplicatorAndNotTimedOut() { + // Create a clone index that exists but is not in the deduplicator (completed but step not finished) + // and was created less than 12 hours ago + long currentTime = Clock.systemUTC().millis(); + long creationTime = currentTime - TimeValue.timeValueHours(2).millis(); // 2 hours ago + + ProjectState projectState = projectStateBuilder().withClone().withCloneCreationTime(creationTime).build(); + + DlmStepContext stepContext = createStepContext(projectState); + cloneStep.execute(stepContext); + + // Should NOT issue a delete since it's not in the deduplicator (might be completing) + assertThat("Should not delete clone not in deduplicator", capturedDeleteRequest.get(), is(nullValue())); + // Should NOT issue a new clone request + assertThat("Should not create new clone while one exists", capturedResizeRequest.get(), is(nullValue())); + } + + public void testExecuteWaitsWhenCloneExistsOver12HoursButNotInDeduplicator() { + // Create a clone index that exists, is not in the deduplicator, and was created > 12 hours ago + // Since it's not in the deduplicator, it's not considered "stuck" - might be completing or already completed + long currentTime = Clock.systemUTC().millis(); + long creationTime = currentTime - TimeValue.timeValueHours(15).millis(); // 15 hours ago + + ProjectState projectState = projectStateBuilder().withClone().withCloneCreationTime(creationTime).build(); + + DlmStepContext stepContext = createStepContext(projectState); + cloneStep.execute(stepContext); + + // Should NOT delete - not in deduplicator means not actively stuck + assertThat("Should not delete old clone if not in deduplicator", capturedDeleteRequest.get(), is(nullValue())); + } + + public void testExecuteCreatesNewCloneAfterTimeoutAndCleanup() { + // Test the full cycle: stuck clone gets deleted, then a new one is created on next run + String cloneIndexName = getDLMCloneIndexName(indexName); + ProjectState projectStateWithOldClone = setupStuckCloneScenario(14); + DlmStepContext stepContext = createStepContext(projectStateWithOldClone); + cloneStep.execute(stepContext); + + // First run: should delete the old clone + assertThat(capturedDeleteRequest.get(), is(notNullValue())); + assertThat(capturedDeleteRequest.get().indices()[0], equalTo(cloneIndexName)); + + // Simulate successful delete + capturedDeleteListener.get().onResponse(AcknowledgedResponse.of(true)); + + // Reset captures and clear deduplicator to simulate that the old stuck request is no longer tracked + capturedDeleteRequest.set(null); + capturedResizeRequest.set(null); + deduplicator.clear(); + + // Second run: now without the clone index + ProjectState projectStateWithoutClone = projectStateBuilder().build(); + DlmStepContext stepContext2 = createStepContext(projectStateWithoutClone); + cloneStep.execute(stepContext2); + + // Should now create a new clone + assertThat("Should create new clone after old one was deleted", capturedResizeRequest.get(), is(notNullValue())); + assertThat(capturedResizeRequest.get().getSourceIndex(), equalTo(indexName)); + } + + public void testCloneFailureWithGenericExceptionRecordsError() { + ProjectState projectState = projectStateBuilder().build(); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + // Simulate clone failure with a generic exception + assertThat(capturedCloneListener.get(), is(notNullValue())); + ElasticsearchException cloneFailure = new ElasticsearchException("clone operation failed"); + capturedCloneListener.get().onFailure(cloneFailure); + + // Error should be recorded + assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); + assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("clone operation failed")); + } + + public void testStuckCloneCleanupFailureRecordsError() { + ProjectState projectState = setupStuckCloneScenario(13); + DlmStepContext stepContext = createStepContext(projectState); + + cloneStep.execute(stepContext); + + // Simulate failed cleanup of stuck clone + assertThat(capturedDeleteListener.get(), is(notNullValue())); + ElasticsearchException cleanupFailure = new ElasticsearchException("cleanup failed"); + capturedDeleteListener.get().onFailure(cleanupFailure); + + // Error should be recorded + assertThat(errorStore.getError(projectId, indexName), is(notNullValue())); + assertThat(Objects.requireNonNull(errorStore.getError(projectId, indexName)).error(), containsString("cleanup failed")); + } + + /** + * Builder for creating ProjectState with various configurations for testing. + */ + private class ProjectStateBuilder { + private int numberOfReplicas = 1; + private Map customMetadata = null; + private String cloneIndexName = null; + private Long cloneCreationTime = null; + private boolean withRouting = false; + private boolean allShardsActive = true; + + ProjectStateBuilder withReplicas(int numberOfReplicas) { + this.numberOfReplicas = numberOfReplicas; + return this; + } + + ProjectStateBuilder withCustomMetadata(Map customMetadata) { + this.customMetadata = customMetadata; + return this; + } + + ProjectStateBuilder withClone() { + this.cloneIndexName = getDLMCloneIndexName(indexName); + return this; + } + + ProjectStateBuilder withClone(String cloneName) { + this.cloneIndexName = cloneName; + return this; + } + + ProjectStateBuilder withCloneCreationTime(long creationTimeMillis) { + this.cloneCreationTime = creationTimeMillis; + return this; + } + + ProjectStateBuilder withRouting() { + this.withRouting = true; + return this; + } + + ProjectStateBuilder withRouting(boolean allShardsActive) { + this.withRouting = true; + this.allShardsActive = allShardsActive; + return this; + } + + ProjectState build() { + // Build original index metadata + IndexMetadata.Builder originalIndexBuilder = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build() + ) + .numberOfShards(1) + .numberOfReplicas(numberOfReplicas); + + if (customMetadata != null) { + originalIndexBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, customMetadata); + } + + IndexMetadata originalIndexMetadata = originalIndexBuilder.build(); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectId).put(originalIndexMetadata, false); + + // Build clone index metadata if requested + IndexMetadata cloneIndexMetadata = null; + if (cloneIndexName != null) { + IndexMetadata.Builder cloneBuilder = IndexMetadata.builder(cloneIndexName) + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) + .numberOfShards(1) + .numberOfReplicas(0); + + if (cloneCreationTime != null) { + cloneBuilder.creationDate(cloneCreationTime); + } + + cloneIndexMetadata = cloneBuilder.build(); + projectMetadataBuilder.put(cloneIndexMetadata, false); + + DataStream dataStream = DataStream.builder( + "test-datastream-" + indexName, + java.util.List.of(cloneIndexMetadata.getIndex(), originalIndexMetadata.getIndex()) + ).setGeneration(2).build(); + projectMetadataBuilder.put(dataStream); + } + + // Build routing table if requested + RoutingTable routingTable = null; + if (withRouting) { + // Route to the clone if it exists, otherwise to the original + Index indexToRoute = cloneIndexMetadata != null ? cloneIndexMetadata.getIndex() : originalIndexMetadata.getIndex(); + ShardRouting primaryShard = TestShardRouting.newShardRouting( + new ShardId(indexToRoute, 0), + "node1", + true, + allShardsActive ? ShardRoutingState.STARTED : ShardRoutingState.INITIALIZING + ); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexToRoute).addShard(primaryShard); + routingTable = RoutingTable.builder().add(indexRoutingTableBuilder).build(); + } + + // Build final ProjectState + ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(projectMetadataBuilder); + if (routingTable != null) { + clusterStateBuilder.putRoutingTable(projectId, routingTable); + } + + return clusterStateBuilder.build().projectState(projectId); + } + } + + private ProjectStateBuilder projectStateBuilder() { + return new ProjectStateBuilder(); + } + + private DlmStepContext createStepContext(ProjectState projectState) { + return new DlmStepContext(index, projectState, deduplicator, errorStore, randomIntBetween(1, 10), client); + } + + /** + * Helper method to create a stuck clone scenario where: + * - A clone index exists with the specified age in hours + * - The clone request is registered in the deduplicator (simulating in-progress request) + * + * @param hoursAgo Number of hours ago the clone was created + * @return ProjectState with the stuck clone index + */ + private ProjectState setupStuckCloneScenario(int hoursAgo) { + long currentTime = Clock.systemUTC().millis(); + long creationTime = currentTime - TimeValue.timeValueHours(hoursAgo).millis(); + + String cloneIndexName = getDLMCloneIndexName(indexName); + ProjectState projectState = projectStateBuilder().withClone().withCloneCreationTime(creationTime).build(); + + // Pre-populate the deduplicator to simulate a stuck in-progress request + ResizeRequest cloneRequest = formCloneRequest(indexName, cloneIndexName); + deduplicator.executeOnce( + Tuple.tuple(projectId, cloneRequest), + ActionListener.noop(), + (req, listener) -> {} // Don't actually execute, just track as in-progress + ); + + return projectState; + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 4047ea655e4fd..fa2b28cef0de8 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -532,6 +532,7 @@ public class Constants { "indices:admin/data_stream/lifecycle/get", "indices:admin/data_stream/lifecycle/put", "indices:admin/data_stream/lifecycle/explain", + "indices:admin/dlm/mark_index_for_force_merge", "indices:admin/data_stream/options/delete", "indices:admin/data_stream/options/get", "indices:admin/data_stream/options/put",