Skip to content

Commit 7d7fa76

Browse files
authored
Batch ILM policy cluster state updates [#122917] (#126529)
* Use a task queue to ensure ILM policy change cluster state updates are batched * Update docs/changelog/126529.yaml * Update docs/changelog/126529.yaml * Switch to using SimpleBatchedAckListenerTaskExecutor * Get timeout from request * Ditch the try-catch
1 parent 7c77ead commit 7d7fa76

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

docs/changelog/126529.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126529
2+
summary: "Batch ILM policy cluster state updates [#122917]"
3+
area: ILM+SLM
4+
type: enhancement
5+
issues:
6+
- 122917

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1818
import org.elasticsearch.cluster.ClusterState;
19-
import org.elasticsearch.cluster.ClusterStateUpdateTask;
19+
import org.elasticsearch.cluster.ClusterStateAckListener;
2020
import org.elasticsearch.cluster.ProjectState;
21+
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
2122
import org.elasticsearch.cluster.block.ClusterBlockException;
2223
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2324
import org.elasticsearch.cluster.metadata.ProjectId;
2425
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2526
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
2627
import org.elasticsearch.cluster.project.ProjectResolver;
2728
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
30+
import org.elasticsearch.common.Priority;
2831
import org.elasticsearch.common.util.concurrent.EsExecutors;
2932
import org.elasticsearch.core.Nullable;
30-
import org.elasticsearch.core.SuppressForbidden;
33+
import org.elasticsearch.core.Tuple;
3134
import org.elasticsearch.injection.guice.Inject;
3235
import org.elasticsearch.license.XPackLicenseState;
3336
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
@@ -70,6 +73,7 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<PutLi
7073
private final Client client;
7174
private final XPackLicenseState licenseState;
7275
private final ProjectResolver projectResolver;
76+
private final MasterServiceTaskQueue<UpdateLifecyclePolicyTask> taskQueue;
7377

7478
@Inject
7579
public TransportPutLifecycleAction(
@@ -96,6 +100,7 @@ public TransportPutLifecycleAction(
96100
this.licenseState = licenseState;
97101
this.client = client;
98102
this.projectResolver = projectResolver;
103+
this.taskQueue = clusterService.createTaskQueue("ilm-put-lifecycle-queue", Priority.NORMAL, new IlmLifecycleExecutor());
99104
}
100105

101106
@Override
@@ -123,10 +128,16 @@ protected void masterOperation(
123128
return;
124129
}
125130

126-
submitUnbatchedTask(
127-
"put-lifecycle-" + request.getPolicy().getName(),
128-
new UpdateLifecyclePolicyTask(projectMetadata.id(), request, listener, licenseState, filteredHeaders, xContentRegistry, client)
131+
UpdateLifecyclePolicyTask putTask = new UpdateLifecyclePolicyTask(
132+
projectMetadata.id(),
133+
request,
134+
listener,
135+
licenseState,
136+
filteredHeaders,
137+
xContentRegistry,
138+
client
129139
);
140+
taskQueue.submitTask("put-lifecycle-" + request.getPolicy().getName(), putTask, putTask.timeout());
130141
}
131142

132143
public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
@@ -237,11 +248,6 @@ public ClusterState execute(ClusterState clusterState) throws Exception {
237248
}
238249
}
239250

240-
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
241-
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
242-
clusterService.submitUnbatchedStateUpdateTask(source, task);
243-
}
244-
245251
/**
246252
* Returns 'true' if the ILM policy is effectually the same (same policy and headers), and thus can be a no-op update.
247253
*/
@@ -342,4 +348,15 @@ public Optional<String> reservedStateHandlerName() {
342348
public Set<String> modifiedKeys(PutLifecycleRequest request) {
343349
return Set.of(request.getPolicy().getName());
344350
}
351+
352+
private static class IlmLifecycleExecutor extends SimpleBatchedAckListenerTaskExecutor<UpdateLifecyclePolicyTask> {
353+
354+
@Override
355+
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateLifecyclePolicyTask task, ClusterState clusterState)
356+
throws Exception {
357+
return Tuple.tuple(task.execute(clusterState), task);
358+
}
359+
360+
}
361+
345362
}

0 commit comments

Comments
 (0)