@@ -93,131 +93,121 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
9393 return currentState ;
9494 }
9595 Step registeredCurrentStep = IndexLifecycleRunner .getCurrentStep (policyStepsRegistry , policy , indexMetadata );
96- if (currentStep .equals (registeredCurrentStep )) {
97- ClusterState state = currentState ;
98- // We can do cluster state steps all together until we
99- // either get to a step that isn't a cluster state step or a
100- // cluster state wait step returns not completed
101- while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
96+ if (currentStep .equals (registeredCurrentStep ) == false ) {
97+ // either we are no longer the master or the step is now
98+ // not the same as when we submitted the update task. In
99+ // either case we don't want to do anything now
100+ return currentState ;
101+ }
102+ ClusterState state = currentState ;
103+ // We can do cluster state steps all together until we
104+ // either get to a step that isn't a cluster state step or a
105+ // cluster state wait step returns not completed
106+ while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
107+ try {
102108 if (currentStep instanceof ClusterStateActionStep ) {
103- // cluster state action step so do the action and
104- // move the cluster state to the next step
105- logger .trace (
106- "[{}] performing cluster state action ({}) [{}]" ,
107- index .getName (),
108- currentStep .getClass ().getSimpleName (),
109- currentStep .getKey ()
110- );
111- try {
112- ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
113- state = actionStep .performAction (index , state );
114- // If this step (usually a CopyExecutionStateStep step) has brought the
115- // index to where it needs to have async actions invoked, then add that
116- // index to the list so that when the new cluster state has been
117- // processed, the new indices will have their async actions invoked.
118- Optional .ofNullable (actionStep .indexForAsyncInvocation ())
119- .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
120- } catch (Exception exception ) {
121- return moveToErrorStep (state , currentStep .getKey (), exception );
122- }
123- // set here to make sure that the clusterProcessed knows to execute the
124- // correct step if it an async action
125- nextStepKey = currentStep .getNextStepKey ();
126- if (nextStepKey == null ) {
127- return state ;
128- } else {
129- logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
130- state = IndexLifecycleTransition .moveClusterStateToStep (
131- index ,
132- state ,
133- nextStepKey ,
134- nowSupplier ,
135- policyStepsRegistry ,
136- false
137- );
138- }
109+ state = executeActionStep (state , currentStep );
139110 } else {
140- // cluster state wait step so evaluate the
141- // condition, if the condition is met move to the
142- // next step, if its not met return the current
143- // cluster state so it can be applied and we will
144- // wait for the next trigger to evaluate the
145- // condition again
146- logger .trace (
147- "[{}] waiting for cluster state step condition ({}) [{}]" ,
148- index .getName (),
149- currentStep .getClass ().getSimpleName (),
150- currentStep .getKey ()
151- );
152- ClusterStateWaitStep .Result result ;
153- try {
154- result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
155- } catch (Exception exception ) {
156- return moveToErrorStep (state , currentStep .getKey (), exception );
157- }
158- // some steps can decide to change the next step to execute after waiting for some time for the condition
159- // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
160- // re-evaluate what the next step is after we evaluate the condition
161- nextStepKey = currentStep .getNextStepKey ();
162- if (result .complete ()) {
163- logger .trace (
164- "[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}" ,
165- index .getName (),
166- currentStep .getClass ().getSimpleName (),
167- currentStep .getKey (),
168- nextStepKey
169- );
170- if (nextStepKey == null ) {
171- return state ;
172- } else {
173- state = IndexLifecycleTransition .moveClusterStateToStep (
174- index ,
175- state ,
176- nextStepKey ,
177- nowSupplier ,
178- policyStepsRegistry ,
179- false
180- );
181- }
182- } else {
183- final ToXContentObject stepInfo = result .informationContext ();
184- if (logger .isTraceEnabled ()) {
185- logger .trace (
186- "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
187- index .getName (),
188- currentStep .getClass ().getSimpleName (),
189- currentStep .getKey (),
190- stepInfo == null ? "null" : Strings .toString (stepInfo )
191- );
192- }
193- // We may have executed a step and set "nextStepKey" to
194- // a value, but in this case, since the condition was
195- // not met, we can't advance any way, so don't attempt
196- // to run the current step
197- nextStepKey = null ;
198- if (stepInfo == null ) {
199- return state ;
200- } else {
201- return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
202- }
203- }
111+ state = executeWaitStep (state , currentStep );
204112 }
205- // There are actions we need to take in the event a phase
206- // transition happens, so even if we would continue in the while
207- // loop, if we are about to go into a new phase, return so that
208- // other processing can occur
209- if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
210- return state ;
211- }
212- currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
113+ } catch (Exception exception ) {
114+ return moveToErrorStep (state , currentStep .getKey (), exception );
213115 }
116+ if (nextStepKey == null ) {
117+ return state ;
118+ } else {
119+ state = moveToNextStep (state );
120+ }
121+ // There are actions we need to take in the event a phase
122+ // transition happens, so even if we would continue in the while
123+ // loop, if we are about to go into a new phase, return so that
124+ // other processing can occur
125+ if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
126+ return state ;
127+ }
128+ currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
129+ }
130+ return state ;
131+ }
132+
133+ private ClusterState executeActionStep (ClusterState state , Step currentStep ) {
134+ // cluster state action step so do the action and
135+ // move the cluster state to the next step
136+ logger .trace (
137+ "[{}] performing cluster state action ({}) [{}]" ,
138+ index .getName (),
139+ currentStep .getClass ().getSimpleName (),
140+ currentStep .getKey ()
141+ );
142+ ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
143+ state = actionStep .performAction (index , state );
144+ // If this step (usually a CopyExecutionStateStep step) has brought the
145+ // index to where it needs to have async actions invoked, then add that
146+ // index to the list so that when the new cluster state has been
147+ // processed, the new indices will have their async actions invoked.
148+ Optional .ofNullable (actionStep .indexForAsyncInvocation ())
149+ .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
150+ // set here to make sure that the clusterProcessed knows to execute the
151+ // correct step if it an async action
152+ nextStepKey = currentStep .getNextStepKey ();
153+ return state ;
154+ }
155+
156+ private ClusterState executeWaitStep (ClusterState state , Step currentStep ) {
157+ // cluster state wait step so evaluate the
158+ // condition, if the condition is met move to the
159+ // next step, if its not met return the current
160+ // cluster state so it can be applied and we will
161+ // wait for the next trigger to evaluate the
162+ // condition again
163+ logger .trace (
164+ "[{}] waiting for cluster state step condition ({}) [{}]" ,
165+ index .getName (),
166+ currentStep .getClass ().getSimpleName (),
167+ currentStep .getKey ()
168+ );
169+ ClusterStateWaitStep .Result result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
170+ // some steps can decide to change the next step to execute after waiting for some time for the condition
171+ // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
172+ // re-evaluate what the next step is after we evaluate the condition
173+ nextStepKey = currentStep .getNextStepKey ();
174+ if (result .complete ()) {
175+ logger .trace (
176+ "[{}] cluster state step condition met successfully ({}) [{}]" ,
177+ index .getName (),
178+ currentStep .getClass ().getSimpleName (),
179+ currentStep .getKey ()
180+ );
214181 return state ;
215182 } else {
216- // either we are no longer the master or the step is now
217- // not the same as when we submitted the update task. In
218- // either case we don't want to do anything now
219- return currentState ;
183+ final ToXContentObject stepInfo = result .informationContext ();
184+ if (logger .isTraceEnabled ()) {
185+ logger .trace (
186+ "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
187+ index .getName (),
188+ currentStep .getClass ().getSimpleName (),
189+ currentStep .getKey (),
190+ stepInfo == null ? "null" : Strings .toString (stepInfo )
191+ );
192+ }
193+ // We may have executed a step and set "nextStepKey" to
194+ // a value, but in this case, since the condition was
195+ // not met, we can't advance any way, so don't attempt
196+ // to run the current step
197+ nextStepKey = null ;
198+ if (stepInfo == null ) {
199+ return state ;
200+ }
201+ return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
202+ }
203+ }
204+
205+ private ClusterState moveToNextStep (ClusterState state ) {
206+ if (nextStepKey == null ) {
207+ return state ;
220208 }
209+ logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
210+ return IndexLifecycleTransition .moveClusterStateToStep (index , state , nextStepKey , nowSupplier , policyStepsRegistry , false );
221211 }
222212
223213 @ Override
0 commit comments