@@ -93,138 +93,142 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
9393 return currentState ;
9494 }
9595 Step registeredCurrentStep = IndexLifecycleRunner .getCurrentStep (policyStepsRegistry , policy , indexMetadata );
96- if (currentStep .equals (registeredCurrentStep )) {
97- ClusterState state = currentState ;
98- // We can do cluster state steps all together until we
99- // either get to a step that isn't a cluster state step or a
100- // cluster state wait step returns not completed
101- while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
96+ if (currentStep .equals (registeredCurrentStep ) == false ) {
97+ // either we are no longer the master or the step is now
98+ // not the same as when we submitted the update task. In
99+ // either case we don't want to do anything now
100+ return currentState ;
101+ }
102+ ClusterState state = currentState ;
103+ // We can do cluster state steps all together until we
104+ // either get to a step that isn't a cluster state step or a
105+ // cluster state wait step returns not completed
106+ while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
107+ try {
102108 if (currentStep instanceof ClusterStateActionStep ) {
103- // cluster state action step so do the action and
104- // move the cluster state to the next step
105- logger .trace (
106- "[{}] performing cluster state action ({}) [{}]" ,
107- index .getName (),
108- currentStep .getClass ().getSimpleName (),
109- currentStep .getKey ()
110- );
111- try {
112- ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
113- state = actionStep .performAction (index , state );
114- // If this step (usually a CopyExecutionStateStep step) has brought the
115- // index to where it needs to have async actions invoked, then add that
116- // index to the list so that when the new cluster state has been
117- // processed, the new indices will have their async actions invoked.
118- Optional .ofNullable (actionStep .indexForAsyncInvocation ())
119- .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
120- } catch (Exception exception ) {
121- return moveToErrorStep (state , currentStep .getKey (), exception );
122- }
123- // set here to make sure that the clusterProcessed knows to execute the
124- // correct step if it an async action
125- nextStepKey = currentStep .getNextStepKey ();
126- if (nextStepKey == null ) {
127- return state ;
128- } else {
129- logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
130- state = ClusterState .builder (state )
131- .putProjectMetadata (
132- IndexLifecycleTransition .moveIndexToStep (
133- index ,
134- state .metadata ().getProject (),
135- nextStepKey ,
136- nowSupplier ,
137- policyStepsRegistry ,
138- false
139- )
140- )
141- .build ();
142- }
109+ state = executeActionStep (state , currentStep );
143110 } else {
144- // cluster state wait step so evaluate the
145- // condition, if the condition is met move to the
146- // next step, if its not met return the current
147- // cluster state so it can be applied and we will
148- // wait for the next trigger to evaluate the
149- // condition again
150- logger .trace (
151- "[{}] waiting for cluster state step condition ({}) [{}]" ,
152- index .getName (),
153- currentStep .getClass ().getSimpleName (),
154- currentStep .getKey ()
155- );
156- ClusterStateWaitStep .Result result ;
157- try {
158- result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
159- } catch (Exception exception ) {
160- return moveToErrorStep (state , currentStep .getKey (), exception );
161- }
162- // some steps can decide to change the next step to execute after waiting for some time for the condition
163- // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
164- // re-evaluate what the next step is after we evaluate the condition
165- nextStepKey = currentStep .getNextStepKey ();
166- if (result .complete ()) {
167- logger .trace (
168- "[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}" ,
169- index .getName (),
170- currentStep .getClass ().getSimpleName (),
171- currentStep .getKey (),
172- nextStepKey
173- );
174- if (nextStepKey == null ) {
175- return state ;
176- } else {
177- state = ClusterState .builder (state )
178- .putProjectMetadata (
179- IndexLifecycleTransition .moveIndexToStep (
180- index ,
181- state .metadata ().getProject (),
182- nextStepKey ,
183- nowSupplier ,
184- policyStepsRegistry ,
185- false
186- )
187- )
188- .build ();
189- }
190- } else {
191- final ToXContentObject stepInfo = result .informationContext ();
192- if (logger .isTraceEnabled ()) {
193- logger .trace (
194- "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
195- index .getName (),
196- currentStep .getClass ().getSimpleName (),
197- currentStep .getKey (),
198- stepInfo == null ? "null" : Strings .toString (stepInfo )
199- );
200- }
201- // We may have executed a step and set "nextStepKey" to
202- // a value, but in this case, since the condition was
203- // not met, we can't advance any way, so don't attempt
204- // to run the current step
205- nextStepKey = null ;
206- if (stepInfo == null ) {
207- return state ;
208- } else {
209- return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
210- }
211- }
212- }
213- // There are actions we need to take in the event a phase
214- // transition happens, so even if we would continue in the while
215- // loop, if we are about to go into a new phase, return so that
216- // other processing can occur
217- if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
218- return state ;
111+ state = executeWaitStep (state , currentStep );
219112 }
220- currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
113+ } catch (Exception exception ) {
114+ return moveToErrorStep (state , currentStep .getKey (), exception );
115+ }
116+ if (nextStepKey == null ) {
117+ return state ;
118+ }
119+ // There are actions we need to take in the event a phase
120+ // transition happens, so even if we would continue in the while
121+ // loop, if we are about to go into a new phase, return so that
122+ // other processing can occur
123+ if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
124+ return state ;
221125 }
126+ currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
127+ }
128+ return state ;
129+ }
130+
131+ private ClusterState executeActionStep (ClusterState state , Step currentStep ) {
132+ // cluster state action step so do the action and
133+ // move the cluster state to the next step
134+ logger .trace (
135+ "[{}] performing cluster state action ({}) [{}]" ,
136+ index .getName (),
137+ currentStep .getClass ().getSimpleName (),
138+ currentStep .getKey ()
139+ );
140+ ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
141+ state = actionStep .performAction (index , state );
142+ // If this step (usually a CopyExecutionStateStep step) has brought the
143+ // index to where it needs to have async actions invoked, then add that
144+ // index to the list so that when the new cluster state has been
145+ // processed, the new indices will have their async actions invoked.
146+ Optional .ofNullable (actionStep .indexForAsyncInvocation ())
147+ .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
148+ // set here to make sure that the clusterProcessed knows to execute the
149+ // correct step if it an async action
150+ nextStepKey = currentStep .getNextStepKey ();
151+ if (nextStepKey == null ) {
222152 return state ;
153+ }
154+ logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
155+ return ClusterState .builder (state )
156+ .putProjectMetadata (
157+ IndexLifecycleTransition .moveIndexToStep (
158+ index ,
159+ state .metadata ().getProject (),
160+ nextStepKey ,
161+ nowSupplier ,
162+ policyStepsRegistry ,
163+ false
164+ )
165+ )
166+ .build ();
167+ }
168+
169+ private ClusterState executeWaitStep (ClusterState state , Step currentStep ) {
170+ // cluster state wait step so evaluate the
171+ // condition, if the condition is met move to the
172+ // next step, if its not met return the current
173+ // cluster state so it can be applied and we will
174+ // wait for the next trigger to evaluate the
175+ // condition again
176+ logger .trace (
177+ "[{}] waiting for cluster state step condition ({}) [{}]" ,
178+ index .getName (),
179+ currentStep .getClass ().getSimpleName (),
180+ currentStep .getKey ()
181+ );
182+ ClusterStateWaitStep .Result result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
183+ // some steps can decide to change the next step to execute after waiting for some time for the condition
184+ // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
185+ // re-evaluate what the next step is after we evaluate the condition
186+ nextStepKey = currentStep .getNextStepKey ();
187+ if (result .complete ()) {
188+ logger .trace (
189+ "[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}" ,
190+ index .getName (),
191+ currentStep .getClass ().getSimpleName (),
192+ currentStep .getKey (),
193+ nextStepKey
194+ );
195+ if (nextStepKey == null ) {
196+ return state ;
197+ } else {
198+ return ClusterState .builder (state )
199+ .putProjectMetadata (
200+ IndexLifecycleTransition .moveIndexToStep (
201+ index ,
202+ state .metadata ().getProject (),
203+ nextStepKey ,
204+ nowSupplier ,
205+ policyStepsRegistry ,
206+ false
207+ )
208+ )
209+ .build ();
210+ }
223211 } else {
224- // either we are no longer the master or the step is now
225- // not the same as when we submitted the update task. In
226- // either case we don't want to do anything now
227- return currentState ;
212+ final ToXContentObject stepInfo = result .informationContext ();
213+ if (logger .isTraceEnabled ()) {
214+ logger .trace (
215+ "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
216+ index .getName (),
217+ currentStep .getClass ().getSimpleName (),
218+ currentStep .getKey (),
219+ stepInfo == null ? "null" : Strings .toString (stepInfo )
220+ );
221+ }
222+ // We may have executed a step and set "nextStepKey" to
223+ // a value, but in this case, since the condition was
224+ // not met, we can't advance any way, so don't attempt
225+ // to run the current step
226+ nextStepKey = null ;
227+ if (stepInfo == null ) {
228+ return state ;
229+ } else {
230+ return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
231+ }
228232 }
229233 }
230234
0 commit comments