Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
8ee2c86
Add clone step to DLM
seanzatzdev Feb 2, 2026
0183020
[CI] Auto commit changes from spotless
Feb 2, 2026
88ceed6
Add clone step to DLM
seanzatzdev Feb 2, 2026
8975131
[CI] Auto commit changes from spotless
Feb 2, 2026
c81bcfb
fix failing tests
seanzatzdev Feb 2, 2026
666f179
formatting
seanzatzdev Feb 2, 2026
99e6e63
Update custom metadata with index for force merge asynchronously
seanzatzdev Feb 2, 2026
41f6830
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 2, 2026
342c1b7
Add additional null checks
seanzatzdev Feb 2, 2026
f54f575
clean up test file
seanzatzdev Feb 2, 2026
c3e7e7f
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 3, 2026
d4dee8b
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 3, 2026
fb81e8c
Export dlm steps to server module to fix DI/give Guice access
seanzatzdev Feb 4, 2026
76cf7dd
Add mark_to_force_merge action to non_operator_actions
seanzatzdev Feb 4, 2026
bdcf9d7
remove unnecessary force merge step
seanzatzdev Feb 5, 2026
93ea13b
minor changes to address PR feedback
seanzatzdev Feb 5, 2026
5536e78
[CI] Auto commit changes from spotless
Feb 5, 2026
a1e8560
simplifying some functions
seanzatzdev Feb 5, 2026
650fbda
address PR comments
seanzatzdev Feb 5, 2026
d46e003
Merge branch 'elastic:main' into dlm-clone-step
seanzatzdev Feb 10, 2026
7219fc1
rename classes
seanzatzdev Feb 9, 2026
142b622
Prevent race conditions
seanzatzdev Feb 10, 2026
f5d909a
fix failing tests
seanzatzdev Feb 10, 2026
654b5d9
run delete clone through deduplicator
seanzatzdev Feb 10, 2026
3e6b735
refactor cleanup
seanzatzdev Feb 10, 2026
3ad95d6
Apply suggestion from @Copilot
seanzatzdev Feb 10, 2026
642d1ed
Apply suggestion from @Copilot
seanzatzdev Feb 10, 2026
f9fe9ba
Refactoring to clean up and bug fixes
seanzatzdev Feb 10, 2026
be7e0e1
[CI] Auto commit changes from spotless
Feb 10, 2026
b213d9d
nit
seanzatzdev Feb 10, 2026
466ff73
add test helper
seanzatzdev Feb 11, 2026
716ee73
add tests for marking action
seanzatzdev Feb 11, 2026
20d6702
[CI] Auto commit changes from spotless
Feb 11, 2026
21720e1
address PR feedback, truncate clone names, keep clone request formati…
seanzatzdev Feb 12, 2026
0642435
[CI] Auto commit changes from spotless
Feb 12, 2026
ffd9f9d
pr feedback
seanzatzdev Feb 12, 2026
f5d23e1
renaming functions
seanzatzdev Feb 12, 2026
a85656b
wait for active shards on resize req
seanzatzdev Feb 12, 2026
6c3bd98
format time logs
seanzatzdev Feb 12, 2026
d7e92d0
Improved error handling, renames, refactor tests to simplify
seanzatzdev Feb 16, 2026
241b5d7
[CI] Auto commit changes from spotless
Feb 16, 2026
b23456d
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 17, 2026
c004bbb
small fixes
seanzatzdev Feb 20, 2026
754cfd9
pr feedback
seanzatzdev Feb 20, 2026
4d72f77
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 20, 2026
29e212f
remove truncated prefix from clone index name, nit
seanzatzdev Feb 20, 2026
176cead
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexToBeForceMergedAction;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.TransportMarkIndexToBeForceMergedAction;
import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction;
import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
import org.elasticsearch.datastreams.options.action.TransportDeleteDataStreamOptionsAction;
Expand Down Expand Up @@ -260,6 +262,7 @@ public List<ActionHandler> getActions() {
actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class));
actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class));
actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class));
actions.add(new ActionHandler(MarkIndexToBeForceMergedAction.INSTANCE, TransportMarkIndexToBeForceMergedAction.class));
return actions;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.lifecycle.transitions.steps;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.admin.indices.shrink.TransportResizeAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext;
import org.elasticsearch.index.Index;

import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;

import static org.apache.logging.log4j.LogManager.getLogger;
import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY;

/**
* This step clones the index into a new index with 0 replicas.
*/
public class CloneStep implements DlmStep {

private static final String DLM_INDEX_TO_BE_MERGED_KEY = "dlm_index_to_be_force_merged";
private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false);
private static final Logger logger = getLogger(CloneStep.class);

@Override
public boolean stepCompleted(Index index, ProjectState projectState) {
// the index can either be "cloned" or the original index if it had 0 replicas
String indexToBeForceMerged = getIndexToBeForceMerged(index.getName(), projectState);
if (indexToBeForceMerged == null) {
return false;
}
boolean cloneExists = projectState.metadata().indices().containsKey(indexToBeForceMerged);
if (cloneExists == false) {
return false;
}
IndexMetadata indexToBeForceMergedMetadata = projectState.metadata().index(indexToBeForceMerged);
if (indexToBeForceMergedMetadata == null) {
return false;
}
IndexRoutingTable indexRoutingTable = projectState.routingTable().index(indexToBeForceMerged);
if (indexRoutingTable == null) {
return false;
}
return indexRoutingTable.allPrimaryShardsActive();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have index metadata that links the two indices? Should we add a check for that here if we do?

Copy link
Contributor Author

@seanzatzdev seanzatzdev Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first conditional in the chain (getIndexToBeForceMerged) should check for that, it checks whether there has been an index marked for force merge for the source index.

}

@Override
public void execute(DlmStepContext stepContext) {
Index index = stepContext.index();
String indexName = index.getName();
ProjectState projectState = stepContext.projectState();
ProjectId projectId = stepContext.projectId();
ProjectMetadata projectMetadata = projectState.metadata();
IndexMetadata indexMetadata = projectMetadata.index(index);

if (indexMetadata == null) {
logger.warn("Index [{}] not found in project metadata, skipping clone step", indexName);
return;
}

if (isForceMergeComplete(indexMetadata)) {
logger.info("Skipping clone step for index [{}] as force merge is already complete", indexName);
return;
}
if (indexMetadata.getNumberOfReplicas() == 0) {
logger.info(
"Skipping clone step for index [{}] as it already has 0 replicas and can be used for force merge directly",
indexName
);
// mark the index to be force merged directly
markIndexToBeForceMerged(indexName, indexName, stepContext, ActionListener.noop());
return;
}
String cloneIndexName = generateCloneIndexName(indexName);
if (projectMetadata.indices().containsKey(cloneIndexName)) {
logger.info("DLM cleaning up clone index [{}] for index [{}] as it already exists.", cloneIndexName, indexName);
deleteCloneIndexIfExists(stepContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this cause an infinite loop?

If this step runs again while the clone is in progress (Which may happen as we keep re-running execute(..) until stepComplete(...) is true, relying on the action duplicators to prevent us actually making changes twice) won't it delete the currently cloning index then try and clone it again next cycle?

Copy link
Contributor Author

@seanzatzdev seanzatzdev Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the deduplicator alone wouldn't fix this though if i understand correctly? Struggling to think of a solution here that allows us to clean up without a potential conflict

I also just realized i forgot to add the deduplicators on the other requests... let me fix that now!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a note here that we discussed this offline and will be following up on this problem to discuss workarounds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the timeout we discussed, figured the best thing to do if the timeout hasn't been reached is just return and let the next DLM run check again rather than have the step's thread waiting around and checking periodically or something

}

ResizeRequest cloneIndexRequest = new ResizeRequest(
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT,
ResizeType.CLONE,
indexName,
cloneIndexName
);
cloneIndexRequest.setTargetIndexSettings(Settings.builder().put("index.number_of_replicas", 0));
stepContext.executeDeduplicatedRequest(
cloneIndexRequest,
Strings.format("DLM service encountered an error when trying to clone index [%s]", indexName),
(req, reqListener) -> cloneIndex(projectId, cloneIndexRequest, reqListener, stepContext)
);
}

@Override
public String stepName() {
return "Clone Index";
}

private static class CloneIndexResizeActionListener implements ActionListener<CreateIndexResponse> {
private final String sourceIndexName;
private final String targetIndexName;
private final ActionListener<Void> listener;
private final DlmStepContext stepContext;

private CloneIndexResizeActionListener(
String sourceIndexName,
String targetIndexName,
ActionListener<Void> listener,
DlmStepContext stepContext
) {
this.sourceIndexName = sourceIndexName;
this.targetIndexName = targetIndexName;
this.listener = listener;
this.stepContext = stepContext;
}

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.debug("DLM successfully cloned index [{}] to index [{}]", sourceIndexName, targetIndexName);
// on success, write the cloned index name to the custom metadata of the index metadata of original index
markIndexToBeForceMerged(sourceIndexName, targetIndexName, stepContext, listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all actions need to be executed via the deduplicator. I'm also not sure what effect calling an action inside a listener vs callback would have on thread usage... Perhaps @dakrone can weigh in on this pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, i totally forgot to wrap the other actions in the deduplicator... will fix that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure what effect calling an action inside a listener vs callback would have on thread usage

True, I can move this to a callback pattern

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a SubscribableListener which can be used for this, where you can chain actions with the .andThen(…) method. I've seen it used multiple places, though I don't claim to be an expert in its use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up adapting the way some other places in the codebase do this, e.g. TransportReindexAction, if I understand correctly the execute's should be async? The SubscribableListener didn't seem as common to me so I was a little apprehensive to use it. Maybe I need some more pointers on this though if there's problems with my approach, I'm finding elasticsearch's listeners to be a little confusing to work with to say the least...

}

@Override
public void onFailure(Exception e) {
logger.error(() -> Strings.format("DLM failed to clone index [%s] to index [%s]", sourceIndexName, targetIndexName), e);
deleteCloneIndexIfExists(stepContext);
listener.onFailure(e);
}
}

private void cloneIndex(ProjectId projectId, ResizeRequest cloneRequest, ActionListener<Void> listener, DlmStepContext stepContext) {
assert cloneRequest.indices() != null && cloneRequest.indices().length == 1 : "DLM should clone one index at a time";
String sourceIndex = cloneRequest.getSourceIndex();
String targetIndex = cloneRequest.getTargetIndexRequest().index();
logger.trace("DLM issuing request to clone index [{}] to index [{}]", sourceIndex, targetIndex);
CloneIndexResizeActionListener responseListener = new CloneIndexResizeActionListener(
sourceIndex,
targetIndex,
listener,
stepContext
);
stepContext.client().projectClient(projectId).execute(TransportResizeAction.TYPE, cloneRequest, responseListener);
}

/*
* Generates a unique name deterministically for the clone index based on the original index name.
*/
private static String generateCloneIndexName(String originalName) {
String hash = MessageDigests.toHexString(MessageDigests.sha256().digest(originalName.getBytes(StandardCharsets.UTF_8)))
.substring(0, 8);
return originalName + "-dlm-clone-" + hash;
}

/*
* Returns true if a value has been set for the custom index metadata field "force_merge_completed_timestamp" within the field
* "data_stream_lifecycle".
*/
private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
Map<String, String> customMetadata = backingIndex.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
return customMetadata != null && customMetadata.containsKey(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY);
}

/*
* Updates the custom metadata of the index metadata of the source index to mark the target index as that to be force merged by DLM.
* This method performs the update asynchronously using a transport action.
*/
private static void markIndexToBeForceMerged(
String sourceIndex,
String indexToBeForceMerged,
DlmStepContext stepContext,
ActionListener<Void> listener
) {
MarkIndexToBeForceMergedAction.Request request = new MarkIndexToBeForceMergedAction.Request(
stepContext.projectId(),
sourceIndex,
indexToBeForceMerged,
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT
);

stepContext.client()
.projectClient(stepContext.projectId())
.execute(MarkIndexToBeForceMergedAction.INSTANCE, request, listener.delegateFailure((delegate, response) -> {
logger.debug("DLM successfully marked index [{}] to be force merged", indexToBeForceMerged);
delegate.onResponse(null);
}));
}

/*
* Returns the name of index to be force merged from the custom metadata of the index metadata of the source index.
* If no such index has been marked in the custom metadata, returns null.
*/
private static String getIndexToBeForceMerged(String sourceIndex, ProjectState projectState) {
IndexMetadata sourceIndexMetadata = projectState.metadata().index(sourceIndex);
if (sourceIndexMetadata == null) {
return null;
}
Map<String, String> customMetadata = sourceIndexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
if (customMetadata == null) {
return null;
} else {
return customMetadata.get(DLM_INDEX_TO_BE_MERGED_KEY);
}
}

private static void deleteCloneIndexIfExists(DlmStepContext stepContext) {
String cloneIndex = generateCloneIndexName(stepContext.indexName());
logger.debug("Attempting to delete index [{}]", cloneIndex);

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(cloneIndex).indicesOptions(IGNORE_MISSING_OPTIONS)
.masterNodeTimeout(TimeValue.MAX_VALUE);
String errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", cloneIndex);
DeleteCloneIndexActionListener listener = new DeleteCloneIndexActionListener(cloneIndex);
stepContext.client()
.projectClient(stepContext.projectId())
.admin()
.indices()
.delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage));
}

private static class DeleteCloneIndexActionListener implements ActionListener<AcknowledgedResponse> {
private final String targetIndex;

private DeleteCloneIndexActionListener(String targetIndex) {
this.targetIndex = targetIndex;
}

@Override
public void onResponse(AcknowledgedResponse response) {
logger.debug("DLM successfully deleted clone index [{}]", targetIndex);
}

@Override
public void onFailure(Exception e) {
logger.error(() -> Strings.format("DLM failed to delete clone index [%s]", targetIndex), e);
}
}

private static <U extends AcknowledgedResponse> ActionListener<U> failIfNotAcknowledged(
ActionListener<U> listener,
String errorMessage
) {
return listener.delegateFailure((delegate, response) -> {
if (response.isAcknowledged()) {
delegate.onResponse(null);
} else {
delegate.onFailure(new ElasticsearchException(errorMessage));
}
});
}
}
Loading