@@ -93,139 +93,132 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
9393 return currentState ;
9494 }
9595 Step registeredCurrentStep = IndexLifecycleRunner .getCurrentStep (policyStepsRegistry , policy , indexMetadata );
96- if (currentStep .equals (registeredCurrentStep )) {
97- ClusterState state = currentState ;
98- // We can do cluster state steps all together until we
99- // either get to a step that isn't a cluster state step or a
100- // cluster state wait step returns not completed
101- while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
96+ if (currentStep .equals (registeredCurrentStep ) == false ) {
97+ // either we are no longer the master or the step is now
98+ // not the same as when we submitted the update task. In
99+ // either case we don't want to do anything now
100+ return currentState ;
101+ }
102+ ClusterState state = currentState ;
103+ // We can do cluster state steps all together until we
104+ // either get to a step that isn't a cluster state step or a
105+ // cluster state wait step returns not completed
106+ while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
107+ try {
102108 if (currentStep instanceof ClusterStateActionStep ) {
103- // cluster state action step so do the action and
104- // move the cluster state to the next step
105- logger .trace (
106- "[{}] performing cluster state action ({}) [{}]" ,
107- index .getName (),
108- currentStep .getClass ().getSimpleName (),
109- currentStep .getKey ()
110- );
111- try {
112- ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
113- state = actionStep .performAction (index , state );
114- // If this step (usually a CopyExecutionStateStep step) has brought the
115- // index to where it needs to have async actions invoked, then add that
116- // index to the list so that when the new cluster state has been
117- // processed, the new indices will have their async actions invoked.
118- Optional .ofNullable (actionStep .indexForAsyncInvocation ())
119- .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
120- } catch (Exception exception ) {
121- return moveToErrorStep (state , currentStep .getKey (), exception );
122- }
123- // set here to make sure that the clusterProcessed knows to execute the
124- // correct step if it an async action
125- nextStepKey = currentStep .getNextStepKey ();
126- if (nextStepKey == null ) {
127- return state ;
128- } else {
129- logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
130- state = ClusterState .builder (state )
131- .putProjectMetadata (
132- IndexLifecycleTransition .moveIndexToStep (
133- index ,
134- state .metadata ().getProject (),
135- nextStepKey ,
136- nowSupplier ,
137- policyStepsRegistry ,
138- false
139- )
140- )
141- .build ();
142- }
109+ state = executeActionStep (state , currentStep );
143110 } else {
144- // cluster state wait step so evaluate the
145- // condition, if the condition is met move to the
146- // next step, if its not met return the current
147- // cluster state so it can be applied and we will
148- // wait for the next trigger to evaluate the
149- // condition again
150- logger .trace (
151- "[{}] waiting for cluster state step condition ({}) [{}]" ,
152- index .getName (),
153- currentStep .getClass ().getSimpleName (),
154- currentStep .getKey ()
155- );
156- ClusterStateWaitStep .Result result ;
157- try {
158- result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
159- } catch (Exception exception ) {
160- return moveToErrorStep (state , currentStep .getKey (), exception );
161- }
162- // some steps can decide to change the next step to execute after waiting for some time for the condition
163- // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
164- // re-evaluate what the next step is after we evaluate the condition
165- nextStepKey = currentStep .getNextStepKey ();
166- if (result .complete ()) {
167- logger .trace (
168- "[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}" ,
169- index .getName (),
170- currentStep .getClass ().getSimpleName (),
171- currentStep .getKey (),
172- nextStepKey
173- );
174- if (nextStepKey == null ) {
175- return state ;
176- } else {
177- state = ClusterState .builder (state )
178- .putProjectMetadata (
179- IndexLifecycleTransition .moveIndexToStep (
180- index ,
181- state .metadata ().getProject (),
182- nextStepKey ,
183- nowSupplier ,
184- policyStepsRegistry ,
185- false
186- )
187- )
188- .build ();
189- }
190- } else {
191- final ToXContentObject stepInfo = result .informationContext ();
192- if (logger .isTraceEnabled ()) {
193- logger .trace (
194- "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
195- index .getName (),
196- currentStep .getClass ().getSimpleName (),
197- currentStep .getKey (),
198- stepInfo == null ? "null" : Strings .toString (stepInfo )
199- );
200- }
201- // We may have executed a step and set "nextStepKey" to
202- // a value, but in this case, since the condition was
203- // not met, we can't advance any way, so don't attempt
204- // to run the current step
205- nextStepKey = null ;
206- if (stepInfo == null ) {
207- return state ;
208- } else {
209- return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
210- }
211- }
212- }
213- // There are actions we need to take in the event a phase
214- // transition happens, so even if we would continue in the while
215- // loop, if we are about to go into a new phase, return so that
216- // other processing can occur
217- if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
218- return state ;
111+ state = executeWaitStep (state , currentStep );
219112 }
220- currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
113+ } catch (Exception exception ) {
114+ return moveToErrorStep (state , currentStep .getKey (), exception );
115+ }
116+ if (nextStepKey == null ) {
117+ return state ;
118+ } else {
119+ state = moveToNextStep (state );
221120 }
121+ // There are actions we need to take in the event a phase
122+ // transition happens, so even if we would continue in the while
123+ // loop, if we are about to go into a new phase, return so that
124+ // other processing can occur
125+ if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
126+ return state ;
127+ }
128+ currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
129+ }
130+ return state ;
131+ }
132+
133+ private ClusterState executeActionStep (ClusterState state , Step currentStep ) {
134+ // cluster state action step so do the action and
135+ // move the cluster state to the next step
136+ logger .trace (
137+ "[{}] performing cluster state action ({}) [{}]" ,
138+ index .getName (),
139+ currentStep .getClass ().getSimpleName (),
140+ currentStep .getKey ()
141+ );
142+ ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
143+ state = actionStep .performAction (index , state );
144+ // If this step (usually a CopyExecutionStateStep step) has brought the
145+ // index to where it needs to have async actions invoked, then add that
146+ // index to the list so that when the new cluster state has been
147+ // processed, the new indices will have their async actions invoked.
148+ Optional .ofNullable (actionStep .indexForAsyncInvocation ())
149+ .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
150+ // set here to make sure that the clusterProcessed knows to execute the
151+ // correct step if it an async action
152+ nextStepKey = currentStep .getNextStepKey ();
153+ return state ;
154+ }
155+
156+ private ClusterState executeWaitStep (ClusterState state , Step currentStep ) {
157+ // cluster state wait step so evaluate the
158+ // condition, if the condition is met move to the
159+ // next step, if its not met return the current
160+ // cluster state so it can be applied and we will
161+ // wait for the next trigger to evaluate the
162+ // condition again
163+ logger .trace (
164+ "[{}] waiting for cluster state step condition ({}) [{}]" ,
165+ index .getName (),
166+ currentStep .getClass ().getSimpleName (),
167+ currentStep .getKey ()
168+ );
169+ ClusterStateWaitStep .Result result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
170+ // some steps can decide to change the next step to execute after waiting for some time for the condition
171+ // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
172+ // re-evaluate what the next step is after we evaluate the condition
173+ nextStepKey = currentStep .getNextStepKey ();
174+ if (result .complete ()) {
175+ logger .trace (
176+ "[{}] cluster state step condition met successfully ({}) [{}]" ,
177+ index .getName (),
178+ currentStep .getClass ().getSimpleName (),
179+ currentStep .getKey ()
180+ );
222181 return state ;
223182 } else {
224- // either we are no longer the master or the step is now
225- // not the same as when we submitted the update task. In
226- // either case we don't want to do anything now
227- return currentState ;
183+ final ToXContentObject stepInfo = result .informationContext ();
184+ if (logger .isTraceEnabled ()) {
185+ logger .trace (
186+ "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
187+ index .getName (),
188+ currentStep .getClass ().getSimpleName (),
189+ currentStep .getKey (),
190+ stepInfo == null ? "null" : Strings .toString (stepInfo )
191+ );
192+ }
193+ // We may have executed a step and set "nextStepKey" to
194+ // a value, but in this case, since the condition was
195+ // not met, we can't advance any way, so don't attempt
196+ // to run the current step
197+ nextStepKey = null ;
198+ if (stepInfo == null ) {
199+ return state ;
200+ }
201+ return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
202+ }
203+ }
204+
205+ private ClusterState moveToNextStep (ClusterState state ) {
206+ if (nextStepKey == null ) {
207+ return state ;
228208 }
209+ logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
210+ return ClusterState .builder (state )
211+ .putProjectMetadata (
212+ IndexLifecycleTransition .moveIndexToStep (
213+ index ,
214+ state .metadata ().getProject (),
215+ nextStepKey ,
216+ nowSupplier ,
217+ policyStepsRegistry ,
218+ false
219+ )
220+ )
221+ .build ();
229222 }
230223
231224 @ Override
0 commit comments