Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
8ee2c86
Add clone step to DLM
seanzatzdev Feb 2, 2026
0183020
[CI] Auto commit changes from spotless
Feb 2, 2026
88ceed6
Add clone step to DLM
seanzatzdev Feb 2, 2026
8975131
[CI] Auto commit changes from spotless
Feb 2, 2026
c81bcfb
fix failing tests
seanzatzdev Feb 2, 2026
666f179
formatting
seanzatzdev Feb 2, 2026
99e6e63
Update custom metadata with index for force merge asynchronously
seanzatzdev Feb 2, 2026
41f6830
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 2, 2026
342c1b7
Add additional null checks
seanzatzdev Feb 2, 2026
f54f575
clean up test file
seanzatzdev Feb 2, 2026
c3e7e7f
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 3, 2026
d4dee8b
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 3, 2026
fb81e8c
Export dlm steps to server module to fix DI/give Guice access
seanzatzdev Feb 4, 2026
76cf7dd
Add mark_to_force_merge action to non_operator_actions
seanzatzdev Feb 4, 2026
bdcf9d7
remove unnecessary force merge step
seanzatzdev Feb 5, 2026
93ea13b
minor changes to address PR feedback
seanzatzdev Feb 5, 2026
5536e78
[CI] Auto commit changes from spotless
Feb 5, 2026
a1e8560
simplifying some functions
seanzatzdev Feb 5, 2026
650fbda
address PR comments
seanzatzdev Feb 5, 2026
d46e003
Merge branch 'elastic:main' into dlm-clone-step
seanzatzdev Feb 10, 2026
7219fc1
rename classes
seanzatzdev Feb 9, 2026
142b622
Prevent race conditions
seanzatzdev Feb 10, 2026
f5d909a
fix failing tests
seanzatzdev Feb 10, 2026
654b5d9
run delete clone through deduplicator
seanzatzdev Feb 10, 2026
3e6b735
refactor cleanup
seanzatzdev Feb 10, 2026
3ad95d6
Apply suggestion from @Copilot
seanzatzdev Feb 10, 2026
642d1ed
Apply suggestion from @Copilot
seanzatzdev Feb 10, 2026
f9fe9ba
Refactoring to clean up and bug fixes
seanzatzdev Feb 10, 2026
be7e0e1
[CI] Auto commit changes from spotless
Feb 10, 2026
b213d9d
nit
seanzatzdev Feb 10, 2026
466ff73
add test helper
seanzatzdev Feb 11, 2026
716ee73
add tests for marking action
seanzatzdev Feb 11, 2026
20d6702
[CI] Auto commit changes from spotless
Feb 11, 2026
21720e1
address PR feedback, truncate clone names, keep clone request formati…
seanzatzdev Feb 12, 2026
0642435
[CI] Auto commit changes from spotless
Feb 12, 2026
ffd9f9d
pr feedback
seanzatzdev Feb 12, 2026
f5d23e1
renaming functions
seanzatzdev Feb 12, 2026
a85656b
wait for active shards on resize req
seanzatzdev Feb 12, 2026
6c3bd98
format time logs
seanzatzdev Feb 12, 2026
d7e92d0
Improved error handling, renames, refactor tests to simplify
seanzatzdev Feb 16, 2026
241b5d7
[CI] Auto commit changes from spotless
Feb 16, 2026
b23456d
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 17, 2026
c004bbb
small fixes
seanzatzdev Feb 20, 2026
754cfd9
pr feedback
seanzatzdev Feb 20, 2026
4d72f77
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 20, 2026
29e212f
remove truncated prefix from clone index name, nit
seanzatzdev Feb 20, 2026
176cead
Merge branch 'main' into dlm-clone-step
seanzatzdev Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/data-streams/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
exports org.elasticsearch.datastreams.lifecycle.action to org.elasticsearch.server;
exports org.elasticsearch.datastreams.lifecycle;
exports org.elasticsearch.datastreams.lifecycle.transitions.steps to org.elasticsearch.server;
exports org.elasticsearch.datastreams.options.action to org.elasticsearch.server;

provides org.elasticsearch.features.FeatureSpecification with org.elasticsearch.datastreams.DataStreamFeatures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.elasticsearch.datastreams.lifecycle.rest.RestGetDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.rest.RestPutDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmAction;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.TransportMarkIndexForDLMForceMergeAction;
import org.elasticsearch.datastreams.options.action.DeleteDataStreamOptionsAction;
import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
import org.elasticsearch.datastreams.options.action.TransportDeleteDataStreamOptionsAction;
Expand Down Expand Up @@ -260,6 +262,7 @@ public List<ActionHandler> getActions() {
actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class));
actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class));
actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class));
actions.add(new ActionHandler(MarkIndexForDLMForceMergeAction.TYPE, TransportMarkIndexForDLMForceMergeAction.class));
return actions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.SimpleBatchedExecutor;
Expand Down Expand Up @@ -74,6 +76,7 @@
import org.elasticsearch.datastreams.lifecycle.transitions.DlmActionContext;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStep;
import org.elasticsearch.datastreams.lifecycle.transitions.DlmStepContext;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -108,6 +111,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
import static org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction.DLM_INDEX_FOR_FORCE_MERGE_KEY;

/**
* This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a data stream lifecycle
Expand Down Expand Up @@ -185,6 +189,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> forceMergeClusterStateUpdateTaskQueue;
private final MasterServiceTaskQueue<DeleteSourceAndAddDownsampleToDS> swapSourceWithDownsampleIndexQueue;
private final MasterServiceTaskQueue<MarkIndexForDlmForceMergeTask> markIndexForDlmForceMergeQueue;
private volatile ByteSizeValue targetMergePolicyFloorSegment;
private volatile int targetMergePolicyFactor;
/**
Expand Down Expand Up @@ -253,6 +258,11 @@ public DataStreamLifecycleService(
Priority.URGENT, // urgent priority as this deletes indices
new DeleteSourceAndAddDownsampleIndexExecutor(allocationService)
);
this.markIndexForDlmForceMergeQueue = clusterService.createTaskQueue(
"dlm-mark-index-for-force-merge",
Priority.LOW,
new MarkIndexForDLMForceMergeExecutor()
);
this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher;
this.actions = actions;
}
Expand Down Expand Up @@ -1761,6 +1771,116 @@ public void onFailure(Exception e) {
}
}

/**
* Task to update the cluster state with the force merge marker for DLM Tiers.
* Public for testing.
*/
public static class MarkIndexForDlmForceMergeTask extends AckedBatchedClusterStateUpdateTask implements ClusterStateTaskListener {
private final ActionListener<AcknowledgedResponse> listener;
private final ProjectId projectId;
private final String originalIndex;
private final String indexToBeForceMerged;

MarkIndexForDlmForceMergeTask(ActionListener<AcknowledgedResponse> listener, MarkIndexForDLMForceMergeAction.Request request) {
super(TimeValue.THIRTY_SECONDS, listener);
this.listener = listener;
this.projectId = request.getProjectId();
this.originalIndex = request.getOriginalIndex();
this.indexToBeForceMerged = request.getIndexToBeForceMerged();
}

ClusterState execute(ClusterState currentState) {
ProjectMetadata projectMetadata = currentState.metadata().getProject(this.projectId);
if (projectMetadata == null) {
return currentState;
}
IndexMetadata originalIndexMetadata = projectMetadata.index(originalIndex);
IndexMetadata cloneIndexMetadata = projectMetadata.index(indexToBeForceMerged);
if (originalIndexMetadata == null) {
// If the source index doesn't exist, we can't mark it for force merge. Return the unchanged cluster state.
String errorMessage = Strings.format(
"DLM attempted to mark clone index [%s] for force merge for original index [%s] but index no longer exists",
indexToBeForceMerged,
originalIndex
);
logger.debug(errorMessage);
throw new IndexNotFoundException(errorMessage);
}

if (cloneIndexMetadata == null) {
// If the clone index doesn't exist, this is an unexpected error - the clone should have been created before this task
String errorMessage = Strings.format(
"clone index [%s] doesn't exist but was expected to exist when marking index [%s] for DLM force merge",
indexToBeForceMerged,
originalIndex
);
logger.debug(errorMessage);
throw new IllegalStateException(errorMessage);
}

Map<String, String> existingCustomMetadata = originalIndexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
Map<String, String> customMetadata = new HashMap<>();
if (existingCustomMetadata != null) {
if (indexToBeForceMerged.equals(existingCustomMetadata.get(DLM_INDEX_FOR_FORCE_MERGE_KEY))) {
// Index is already marked for force merge, no update needed
return currentState;
}
customMetadata.putAll(existingCustomMetadata);
}
customMetadata.put(DLM_INDEX_FOR_FORCE_MERGE_KEY, indexToBeForceMerged);

IndexMetadata updatedOriginalIndexMetadata = new IndexMetadata.Builder(originalIndexMetadata).putCustom(
LIFECYCLE_CUSTOM_INDEX_METADATA_KEY,
customMetadata
).build();

final ProjectMetadata.Builder updatedProject = ProjectMetadata.builder(currentState.metadata().getProject(projectId))
.put(updatedOriginalIndexMetadata, true);
return ClusterState.builder(currentState).putProjectMetadata(updatedProject).build();
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

/**
* MarkIndexForDLMForceMergeExecutor for the MarkIndexForDlmForceMergeTask.
* Public for testing.
*/
public static class MarkIndexForDLMForceMergeExecutor implements ClusterStateTaskExecutor<MarkIndexForDlmForceMergeTask> {
@Override
public ClusterState execute(BatchExecutionContext<MarkIndexForDlmForceMergeTask> batchExecutionContext) {
var state = batchExecutionContext.initialState();
for (final var taskContext : batchExecutionContext.taskContexts()) {
try {
final MarkIndexForDlmForceMergeTask task = taskContext.getTask();
state = task.execute(state);
taskContext.success(task);
} catch (Exception e) {
taskContext.onFailure(e);
}
}
return state;
}
}

/**
* Marks the given index to be force merged for DLM by updating the cluster state with the name of the index to be force merged in the
* custom metadata of the source index. This method returns immediately, but the update to the cluster state happens asynchronously and
* the listener is notified on success or failure of the cluster state update.
* @param request the request
* @param listener the listener to be notified on success or failure of the cluster state update.
*/
public void markIndexForDlmForceMerge(MarkIndexForDLMForceMergeAction.Request request, ActionListener<AcknowledgedResponse> listener) {
markIndexForDlmForceMergeQueue.submitTask(
Strings.format("DLM marking index [%s] to be force merged for DLM", request.getIndexToBeForceMerged()),
new MarkIndexForDlmForceMergeTask(listener, request),
null
);
}

/**
* This wrapper exists only to provide equals and hashCode implementations of a ForceMergeRequest for transportActionsDeduplicator.
* It intentionally ignores forceMergeUUID (which ForceMergeRequest's equals/hashCode would have to if they existed) because we don't
Expand Down
Loading