1616import org .elasticsearch .client .internal .Client ;
1717import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
1818import org .elasticsearch .cluster .ClusterState ;
19- import org .elasticsearch .cluster .ClusterStateUpdateTask ;
19+ import org .elasticsearch .cluster .ClusterStateAckListener ;
20+ import org .elasticsearch .cluster .SimpleBatchedAckListenerTaskExecutor ;
2021import org .elasticsearch .cluster .block .ClusterBlockException ;
2122import org .elasticsearch .cluster .block .ClusterBlockLevel ;
2223import org .elasticsearch .cluster .metadata .Metadata ;
2324import org .elasticsearch .cluster .metadata .RepositoriesMetadata ;
2425import org .elasticsearch .cluster .service .ClusterService ;
26+ import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
27+ import org .elasticsearch .common .Priority ;
2528import org .elasticsearch .common .util .concurrent .EsExecutors ;
2629import org .elasticsearch .core .Nullable ;
27- import org .elasticsearch .core .SuppressForbidden ;
30+ import org .elasticsearch .core .Tuple ;
2831import org .elasticsearch .injection .guice .Inject ;
2932import org .elasticsearch .license .XPackLicenseState ;
3033import org .elasticsearch .reservedstate .ReservedClusterStateHandler ;
@@ -66,6 +69,7 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<PutLi
6669 private final NamedXContentRegistry xContentRegistry ;
6770 private final Client client ;
6871 private final XPackLicenseState licenseState ;
72+ private final MasterServiceTaskQueue <UpdateLifecyclePolicyTask > taskQueue ;
6973
7074 @ Inject
7175 public TransportPutLifecycleAction (
@@ -90,6 +94,7 @@ public TransportPutLifecycleAction(
9094 this .xContentRegistry = namedXContentRegistry ;
9195 this .licenseState = licenseState ;
9296 this .client = client ;
97+ this .taskQueue = clusterService .createTaskQueue ("ilm-put-lifecycle-queue" , Priority .NORMAL , new IlmLifecycleExecutor ());
9398 }
9499
95100 @ Override
@@ -117,10 +122,15 @@ protected void masterOperation(
117122 }
118123 }
119124
120- submitUnbatchedTask (
121- "put-lifecycle-" + request .getPolicy ().getName (),
122- new UpdateLifecyclePolicyTask (request , listener , licenseState , filteredHeaders , xContentRegistry , client )
125+ UpdateLifecyclePolicyTask putTask = new UpdateLifecyclePolicyTask (
126+ request ,
127+ listener ,
128+ licenseState ,
129+ filteredHeaders ,
130+ xContentRegistry ,
131+ client
123132 );
133+ taskQueue .submitTask ("put-lifecycle-" + request .getPolicy ().getName (), putTask , putTask .timeout ());
124134 }
125135
126136 public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
@@ -223,11 +233,6 @@ public ClusterState execute(ClusterState currentState) throws Exception {
223233 }
224234 }
225235
226- @ SuppressForbidden (reason = "legacy usage of unbatched task" ) // TODO add support for batching here
227- private void submitUnbatchedTask (@ SuppressWarnings ("SameParameterValue" ) String source , ClusterStateUpdateTask task ) {
228- clusterService .submitUnbatchedStateUpdateTask (source , task );
229- }
230-
231236 /**
232237 * Returns 'true' if the ILM policy is effectually the same (same policy and headers), and thus can be a no-op update.
233238 */
@@ -328,4 +333,15 @@ public Optional<String> reservedStateHandlerName() {
328333 public Set <String > modifiedKeys (PutLifecycleRequest request ) {
329334 return Set .of (request .getPolicy ().getName ());
330335 }
336+
337+ private static class IlmLifecycleExecutor extends SimpleBatchedAckListenerTaskExecutor <UpdateLifecyclePolicyTask > {
338+
339+ @ Override
340+ public Tuple <ClusterState , ClusterStateAckListener > executeTask (UpdateLifecyclePolicyTask task , ClusterState clusterState )
341+ throws Exception {
342+ return Tuple .tuple (task .execute (clusterState ), task );
343+ }
344+
345+ }
346+
331347}
0 commit comments