1010import org .apache .logging .log4j .LogManager ;
1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .action .ActionListener ;
13- import org .elasticsearch .action .ActionRequestValidationException ;
14- import org .elasticsearch .action .IndicesRequest ;
1513import org .elasticsearch .action .support .ActionFilters ;
16- import org .elasticsearch .action .support .IndicesOptions ;
17- import org .elasticsearch .action .support .master .AcknowledgedRequest ;
1814import org .elasticsearch .action .support .master .AcknowledgedResponse ;
1915import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
2016import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
2521import org .elasticsearch .cluster .metadata .IndexMetadata ;
2622import org .elasticsearch .cluster .metadata .LifecycleExecutionState ;
2723import org .elasticsearch .cluster .service .ClusterService ;
28- import org .elasticsearch .common .io .stream .StreamInput ;
29- import org .elasticsearch .common .io .stream .StreamOutput ;
3024import org .elasticsearch .common .util .concurrent .EsExecutors ;
3125import org .elasticsearch .core .SuppressForbidden ;
32- import org .elasticsearch .core .TimeValue ;
3326import org .elasticsearch .injection .guice .Inject ;
3427import org .elasticsearch .tasks .Task ;
3528import org .elasticsearch .threadpool .ThreadPool ;
3629import org .elasticsearch .transport .TransportService ;
3730import org .elasticsearch .xpack .core .ilm .Step .StepKey ;
3831import org .elasticsearch .xpack .core .ilm .action .ILMActions ;
32+ import org .elasticsearch .xpack .core .ilm .action .RetryActionRequest ;
3933import org .elasticsearch .xpack .ilm .IndexLifecycleService ;
4034
41- import java .io .IOException ;
42- import java .util .Arrays ;
43- import java .util .Objects ;
44-
45- public class TransportRetryAction extends TransportMasterNodeAction <TransportRetryAction .Request , AcknowledgedResponse > {
35+ public class TransportRetryAction extends TransportMasterNodeAction <RetryActionRequest , AcknowledgedResponse > {
4636
4737 private static final Logger logger = LogManager .getLogger (TransportRetryAction .class );
4838
@@ -62,15 +52,25 @@ public TransportRetryAction(
6252 clusterService ,
6353 threadPool ,
6454 actionFilters ,
65- Request ::new ,
55+ RetryActionRequest ::new ,
6656 AcknowledgedResponse ::readFrom ,
6757 EsExecutors .DIRECT_EXECUTOR_SERVICE
6858 );
6959 this .indexLifecycleService = indexLifecycleService ;
7060 }
7161
7262 @ Override
73- protected void masterOperation (Task task , Request request , ClusterState state , ActionListener <AcknowledgedResponse > listener ) {
63+ protected void masterOperation (
64+ Task task ,
65+ RetryActionRequest request ,
66+ ClusterState state ,
67+ ActionListener <AcknowledgedResponse > listener
68+ ) {
69+ if (request .requireError () == false ) {
70+ maybeRunAsyncAction (state , request .indices ());
71+ listener .onResponse (AcknowledgedResponse .TRUE );
72+ return ;
73+ }
7474 submitUnbatchedTask ("ilm-re-run" , new AckedClusterStateUpdateTask (request , listener ) {
7575 @ Override
7676 public ClusterState execute (ClusterState currentState ) {
@@ -79,101 +79,33 @@ public ClusterState execute(ClusterState currentState) {
7979
8080 @ Override
8181 public void clusterStateProcessed (ClusterState oldState , ClusterState newState ) {
82- for (String index : request .indices ()) {
83- IndexMetadata idxMeta = newState .metadata ().getProject ().index (index );
84- LifecycleExecutionState lifecycleState = idxMeta .getLifecycleExecutionState ();
85- StepKey retryStep = new StepKey (lifecycleState .phase (), lifecycleState .action (), lifecycleState .step ());
86- if (idxMeta == null ) {
87- // The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
88- logger .debug (
89- "index ["
90- + index
91- + "] has been deleted after moving to step ["
92- + lifecycleState .step ()
93- + "], skipping async action check"
94- );
95- return ;
96- }
97- indexLifecycleService .maybeRunAsyncAction (newState , idxMeta , retryStep );
98- }
82+ maybeRunAsyncAction (newState , request .indices ());
9983 }
10084 });
10185 }
10286
87+ private void maybeRunAsyncAction (ClusterState state , String [] indices ) {
88+ for (String index : indices ) {
89+ IndexMetadata idxMeta = state .metadata ().getProject ().index (index );
90+ if (idxMeta == null ) {
91+ // The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
92+ logger .debug ("index [" + index + "] has been deleted, skipping async action check" );
93+ return ;
94+ }
95+ LifecycleExecutionState lifecycleState = idxMeta .getLifecycleExecutionState ();
96+ StepKey retryStep = new StepKey (lifecycleState .phase (), lifecycleState .action (), lifecycleState .step ());
97+ indexLifecycleService .maybeRunAsyncAction (state , idxMeta , retryStep );
98+ }
99+ }
100+
103101 @ SuppressForbidden (reason = "legacy usage of unbatched task" ) // TODO add support for batching here
104102 private void submitUnbatchedTask (@ SuppressWarnings ("SameParameterValue" ) String source , ClusterStateUpdateTask task ) {
105103 clusterService .submitUnbatchedStateUpdateTask (source , task );
106104 }
107105
108106 @ Override
109- protected ClusterBlockException checkBlock (Request request , ClusterState state ) {
107+ protected ClusterBlockException checkBlock (RetryActionRequest request , ClusterState state ) {
110108 return state .blocks ().globalBlockedException (ClusterBlockLevel .METADATA_WRITE );
111109 }
112110
113- public static class Request extends AcknowledgedRequest <Request > implements IndicesRequest .Replaceable {
114- private String [] indices ;
115- private IndicesOptions indicesOptions = IndicesOptions .strictExpandOpen ();
116-
117- public Request (TimeValue masterNodeTimeout , TimeValue ackTimeout , String ... indices ) {
118- super (masterNodeTimeout , ackTimeout );
119- this .indices = indices ;
120- }
121-
122- public Request (StreamInput in ) throws IOException {
123- super (in );
124- this .indices = in .readStringArray ();
125- this .indicesOptions = IndicesOptions .readIndicesOptions (in );
126- }
127-
128- @ Override
129- public Request indices (String ... indices ) {
130- this .indices = indices ;
131- return this ;
132- }
133-
134- @ Override
135- public String [] indices () {
136- return indices ;
137- }
138-
139- @ Override
140- public IndicesOptions indicesOptions () {
141- return indicesOptions ;
142- }
143-
144- public Request indicesOptions (IndicesOptions indicesOptions ) {
145- this .indicesOptions = indicesOptions ;
146- return this ;
147- }
148-
149- @ Override
150- public ActionRequestValidationException validate () {
151- return null ;
152- }
153-
154- @ Override
155- public void writeTo (StreamOutput out ) throws IOException {
156- super .writeTo (out );
157- out .writeStringArray (indices );
158- indicesOptions .writeIndicesOptions (out );
159- }
160-
161- @ Override
162- public int hashCode () {
163- return Objects .hash (Arrays .hashCode (indices ), indicesOptions );
164- }
165-
166- @ Override
167- public boolean equals (Object obj ) {
168- if (obj == null ) {
169- return false ;
170- }
171- if (obj .getClass () != getClass ()) {
172- return false ;
173- }
174- Request other = (Request ) obj ;
175- return Objects .deepEquals (indices , other .indices ) && Objects .equals (indicesOptions , other .indicesOptions );
176- }
177-
178- }
179111}
0 commit comments