Skip to content

Commit 10a7947

Browse files
committed
making the cluster state update task more generic
1 parent 4e7cbd4 commit 10a7947

File tree

2 files changed

+36
-24
lines changed

2 files changed

+36
-24
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
176176
private volatile RolloverConfiguration rolloverConfiguration;
177177
private SchedulerEngine.Job scheduledJob;
178178
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
179-
private final MasterServiceTaskQueue<UpdateForceMergeCompleteTask> forceMergeClusterStateUpdateTaskQueue;
179+
private final MasterServiceTaskQueue<UpdateDataStreamLifecycleCustomMetadataTask> dlmCustomMetadataClusterStateUpdateTaskQueue;
180180
private final MasterServiceTaskQueue<DeleteSourceAndAddDownsampleToDS> swapSourceWithDownsampleIndexQueue;
181181
private volatile ByteSizeValue targetMergePolicyFloorSegment;
182182
private volatile int targetMergePolicyFactor;
@@ -192,16 +192,18 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
192192
private volatile Long lastRunDuration = null;
193193
private volatile Long timeBetweenStarts = null;
194194

195-
private static final SimpleBatchedExecutor<UpdateForceMergeCompleteTask, Void> FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR =
196-
new SimpleBatchedExecutor<>() {
195+
private static final SimpleBatchedExecutor<
196+
UpdateDataStreamLifecycleCustomMetadataTask,
197+
Void> DLM_CUSTOM_METADATA_STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
197198
@Override
198-
public Tuple<ClusterState, Void> executeTask(UpdateForceMergeCompleteTask task, ClusterState clusterState) throws Exception {
199+
public Tuple<ClusterState, Void> executeTask(UpdateDataStreamLifecycleCustomMetadataTask task, ClusterState clusterState)
200+
throws Exception {
199201
return Tuple.tuple(task.execute(clusterState), null);
200202
}
201203

202204
@Override
203-
public void taskSucceeded(UpdateForceMergeCompleteTask task, Void unused) {
204-
logger.trace("Updated cluster state for force merge of index [{}]", task.targetIndex);
205+
public void taskSucceeded(UpdateDataStreamLifecycleCustomMetadataTask task, Void unused) {
206+
logger.trace("Updated cluster state for {} of index [{}]", task.key, task.targetIndex);
205207
task.listener.onResponse(null);
206208
}
207209
};
@@ -235,10 +237,10 @@ public DataStreamLifecycleService(
235237
this.signallingErrorRetryInterval = DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING.get(settings);
236238
this.rolloverConfiguration = clusterService.getClusterSettings()
237239
.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING);
238-
this.forceMergeClusterStateUpdateTaskQueue = clusterService.createTaskQueue(
240+
this.dlmCustomMetadataClusterStateUpdateTaskQueue = clusterService.createTaskQueue(
239241
"data-stream-lifecycle-forcemerge-state-update",
240242
Priority.LOW,
241-
FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR
243+
DLM_CUSTOM_METADATA_STATE_UPDATE_TASK_EXECUTOR
242244
);
243245
this.swapSourceWithDownsampleIndexQueue = clusterService.createTaskQueue(
244246
"data-stream-lifecycle-swap-source-with-downsample",
@@ -1371,9 +1373,15 @@ public void onFailure(Exception e) {
13711373
* success or failure.
13721374
*/
13731375
private void setForceMergeCompletedTimestamp(ProjectId projectId, String targetIndex, ActionListener<Void> listener) {
1374-
forceMergeClusterStateUpdateTaskQueue.submitTask(
1376+
dlmCustomMetadataClusterStateUpdateTaskQueue.submitTask(
13751377
Strings.format("Adding force merge complete marker to cluster state for [%s]", targetIndex),
1376-
new UpdateForceMergeCompleteTask(listener, projectId, targetIndex, threadPool),
1378+
new UpdateDataStreamLifecycleCustomMetadataTask(
1379+
listener,
1380+
projectId,
1381+
targetIndex,
1382+
FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY,
1383+
Long.toString(threadPool.absoluteTimeInMillis())
1384+
),
13771385
null
13781386
);
13791387
}
@@ -1597,32 +1605,40 @@ public void setNowSupplier(LongSupplier nowSupplier) {
15971605
}
15981606

15991607
/**
1600-
* This is a ClusterStateTaskListener that writes the force_merge_completed_timestamp into the cluster state. It is meant to run in
1601-
* STATE_UPDATE_TASK_EXECUTOR.
1608+
* This is a ClusterStateTaskListener that writes the key and value into the cluster state as custom metadata within targetIndex's
1609+
* index metadata. It is meant to run in STATE_UPDATE_TASK_EXECUTOR.
16021610
*/
1603-
static class UpdateForceMergeCompleteTask implements ClusterStateTaskListener {
1611+
static class UpdateDataStreamLifecycleCustomMetadataTask implements ClusterStateTaskListener {
16041612
private final ActionListener<Void> listener;
16051613
private final ProjectId projectId;
16061614
private final String targetIndex;
1607-
private final ThreadPool threadPool;
1615+
private final String key;
1616+
private final String value;
16081617

1609-
UpdateForceMergeCompleteTask(ActionListener<Void> listener, ProjectId projectId, String targetIndex, ThreadPool threadPool) {
1618+
UpdateDataStreamLifecycleCustomMetadataTask(
1619+
ActionListener<Void> listener,
1620+
ProjectId projectId,
1621+
String targetIndex,
1622+
String key,
1623+
String value
1624+
) {
16101625
this.listener = listener;
16111626
this.projectId = projectId;
16121627
this.targetIndex = targetIndex;
1613-
this.threadPool = threadPool;
1628+
this.key = key;
1629+
this.value = value;
16141630
}
16151631

16161632
ClusterState execute(ClusterState currentState) throws Exception {
1617-
logger.debug("Updating cluster state with force merge complete marker for {}", targetIndex);
1633+
logger.debug("Updating cluster state with {}} marker for {}", key, targetIndex);
16181634
final var currentProject = currentState.metadata().getProject(projectId);
16191635
IndexMetadata indexMetadata = currentProject.index(targetIndex);
16201636
Map<String, String> customMetadata = indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
16211637
Map<String, String> newCustomMetadata = new HashMap<>();
16221638
if (customMetadata != null) {
16231639
newCustomMetadata.putAll(customMetadata);
16241640
}
1625-
newCustomMetadata.put(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, Long.toString(threadPool.absoluteTimeInMillis()));
1641+
newCustomMetadata.put(key, value);
16261642
IndexMetadata updatededIndexMetadata = new IndexMetadata.Builder(indexMetadata).putCustom(
16271643
LIFECYCLE_CUSTOM_INDEX_METADATA_KEY,
16281644
newCustomMetadata

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -960,12 +960,8 @@ public void onFailure(Exception e) {
960960
};
961961
final var projectId = randomProjectIdOrDefault();
962962
String targetIndex = randomAlphaOfLength(20);
963-
DataStreamLifecycleService.UpdateForceMergeCompleteTask task = new DataStreamLifecycleService.UpdateForceMergeCompleteTask(
964-
listener,
965-
projectId,
966-
targetIndex,
967-
threadPool
968-
);
963+
DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask task =
964+
new DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask(listener, projectId, targetIndex, threadPool);
969965
{
970966
Exception exception = new RuntimeException("task failed");
971967
task.onFailure(exception);

0 commit comments

Comments
 (0)