@@ -93,131 +93,121 @@ public ClusterState doExecute(final ClusterState currentState) throws IOExceptio
93
93
return currentState ;
94
94
}
95
95
Step registeredCurrentStep = IndexLifecycleRunner .getCurrentStep (policyStepsRegistry , policy , indexMetadata );
96
- if (currentStep .equals (registeredCurrentStep )) {
97
- ClusterState state = currentState ;
98
- // We can do cluster state steps all together until we
99
- // either get to a step that isn't a cluster state step or a
100
- // cluster state wait step returns not completed
101
- while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
96
+ if (currentStep .equals (registeredCurrentStep ) == false ) {
97
+ // either we are no longer the master or the step is now
98
+ // not the same as when we submitted the update task. In
99
+ // either case we don't want to do anything now
100
+ return currentState ;
101
+ }
102
+ ClusterState state = currentState ;
103
+ // We can do cluster state steps all together until we
104
+ // either get to a step that isn't a cluster state step or a
105
+ // cluster state wait step returns not completed
106
+ while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep ) {
107
+ try {
102
108
if (currentStep instanceof ClusterStateActionStep ) {
103
- // cluster state action step so do the action and
104
- // move the cluster state to the next step
105
- logger .trace (
106
- "[{}] performing cluster state action ({}) [{}]" ,
107
- index .getName (),
108
- currentStep .getClass ().getSimpleName (),
109
- currentStep .getKey ()
110
- );
111
- try {
112
- ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
113
- state = actionStep .performAction (index , state );
114
- // If this step (usually a CopyExecutionStateStep step) has brought the
115
- // index to where it needs to have async actions invoked, then add that
116
- // index to the list so that when the new cluster state has been
117
- // processed, the new indices will have their async actions invoked.
118
- Optional .ofNullable (actionStep .indexForAsyncInvocation ())
119
- .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
120
- } catch (Exception exception ) {
121
- return moveToErrorStep (state , currentStep .getKey (), exception );
122
- }
123
- // set here to make sure that the clusterProcessed knows to execute the
124
- // correct step if it an async action
125
- nextStepKey = currentStep .getNextStepKey ();
126
- if (nextStepKey == null ) {
127
- return state ;
128
- } else {
129
- logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
130
- state = IndexLifecycleTransition .moveClusterStateToStep (
131
- index ,
132
- state ,
133
- nextStepKey ,
134
- nowSupplier ,
135
- policyStepsRegistry ,
136
- false
137
- );
138
- }
109
+ state = executeActionStep (state , currentStep );
139
110
} else {
140
- // cluster state wait step so evaluate the
141
- // condition, if the condition is met move to the
142
- // next step, if its not met return the current
143
- // cluster state so it can be applied and we will
144
- // wait for the next trigger to evaluate the
145
- // condition again
146
- logger .trace (
147
- "[{}] waiting for cluster state step condition ({}) [{}]" ,
148
- index .getName (),
149
- currentStep .getClass ().getSimpleName (),
150
- currentStep .getKey ()
151
- );
152
- ClusterStateWaitStep .Result result ;
153
- try {
154
- result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
155
- } catch (Exception exception ) {
156
- return moveToErrorStep (state , currentStep .getKey (), exception );
157
- }
158
- // some steps can decide to change the next step to execute after waiting for some time for the condition
159
- // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
160
- // re-evaluate what the next step is after we evaluate the condition
161
- nextStepKey = currentStep .getNextStepKey ();
162
- if (result .complete ()) {
163
- logger .trace (
164
- "[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}" ,
165
- index .getName (),
166
- currentStep .getClass ().getSimpleName (),
167
- currentStep .getKey (),
168
- nextStepKey
169
- );
170
- if (nextStepKey == null ) {
171
- return state ;
172
- } else {
173
- state = IndexLifecycleTransition .moveClusterStateToStep (
174
- index ,
175
- state ,
176
- nextStepKey ,
177
- nowSupplier ,
178
- policyStepsRegistry ,
179
- false
180
- );
181
- }
182
- } else {
183
- final ToXContentObject stepInfo = result .informationContext ();
184
- if (logger .isTraceEnabled ()) {
185
- logger .trace (
186
- "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
187
- index .getName (),
188
- currentStep .getClass ().getSimpleName (),
189
- currentStep .getKey (),
190
- stepInfo == null ? "null" : Strings .toString (stepInfo )
191
- );
192
- }
193
- // We may have executed a step and set "nextStepKey" to
194
- // a value, but in this case, since the condition was
195
- // not met, we can't advance any way, so don't attempt
196
- // to run the current step
197
- nextStepKey = null ;
198
- if (stepInfo == null ) {
199
- return state ;
200
- } else {
201
- return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
202
- }
203
- }
111
+ state = executeWaitStep (state , currentStep );
204
112
}
205
- // There are actions we need to take in the event a phase
206
- // transition happens, so even if we would continue in the while
207
- // loop, if we are about to go into a new phase, return so that
208
- // other processing can occur
209
- if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
210
- return state ;
211
- }
212
- currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
113
+ } catch (Exception exception ) {
114
+ return moveToErrorStep (state , currentStep .getKey (), exception );
213
115
}
116
+ if (nextStepKey == null ) {
117
+ return state ;
118
+ } else {
119
+ state = moveToNextStep (state );
120
+ }
121
+ // There are actions we need to take in the event a phase
122
+ // transition happens, so even if we would continue in the while
123
+ // loop, if we are about to go into a new phase, return so that
124
+ // other processing can occur
125
+ if (currentStep .getKey ().phase ().equals (currentStep .getNextStepKey ().phase ()) == false ) {
126
+ return state ;
127
+ }
128
+ currentStep = policyStepsRegistry .getStep (indexMetadata , currentStep .getNextStepKey ());
129
+ }
130
+ return state ;
131
+ }
132
+
133
+ private ClusterState executeActionStep (ClusterState state , Step currentStep ) {
134
+ // cluster state action step so do the action and
135
+ // move the cluster state to the next step
136
+ logger .trace (
137
+ "[{}] performing cluster state action ({}) [{}]" ,
138
+ index .getName (),
139
+ currentStep .getClass ().getSimpleName (),
140
+ currentStep .getKey ()
141
+ );
142
+ ClusterStateActionStep actionStep = (ClusterStateActionStep ) currentStep ;
143
+ state = actionStep .performAction (index , state );
144
+ // If this step (usually a CopyExecutionStateStep step) has brought the
145
+ // index to where it needs to have async actions invoked, then add that
146
+ // index to the list so that when the new cluster state has been
147
+ // processed, the new indices will have their async actions invoked.
148
+ Optional .ofNullable (actionStep .indexForAsyncInvocation ())
149
+ .ifPresent (tuple -> indexToStepKeysForAsyncActions .put (tuple .v1 (), tuple .v2 ()));
150
+ // set here to make sure that the clusterProcessed knows to execute the
151
+ // correct step if it an async action
152
+ nextStepKey = currentStep .getNextStepKey ();
153
+ return state ;
154
+ }
155
+
156
+ private ClusterState executeWaitStep (ClusterState state , Step currentStep ) {
157
+ // cluster state wait step so evaluate the
158
+ // condition, if the condition is met move to the
159
+ // next step, if its not met return the current
160
+ // cluster state so it can be applied and we will
161
+ // wait for the next trigger to evaluate the
162
+ // condition again
163
+ logger .trace (
164
+ "[{}] waiting for cluster state step condition ({}) [{}]" ,
165
+ index .getName (),
166
+ currentStep .getClass ().getSimpleName (),
167
+ currentStep .getKey ()
168
+ );
169
+ ClusterStateWaitStep .Result result = ((ClusterStateWaitStep ) currentStep ).isConditionMet (index , state );
170
+ // some steps can decide to change the next step to execute after waiting for some time for the condition
171
+ // to be met (eg. {@link LifecycleSettings#LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING}, so it's important we
172
+ // re-evaluate what the next step is after we evaluate the condition
173
+ nextStepKey = currentStep .getNextStepKey ();
174
+ if (result .complete ()) {
175
+ logger .trace (
176
+ "[{}] cluster state step condition met successfully ({}) [{}]" ,
177
+ index .getName (),
178
+ currentStep .getClass ().getSimpleName (),
179
+ currentStep .getKey ()
180
+ );
214
181
return state ;
215
182
} else {
216
- // either we are no longer the master or the step is now
217
- // not the same as when we submitted the update task. In
218
- // either case we don't want to do anything now
219
- return currentState ;
183
+ final ToXContentObject stepInfo = result .informationContext ();
184
+ if (logger .isTraceEnabled ()) {
185
+ logger .trace (
186
+ "[{}] condition not met ({}) [{}], returning existing state (info: {})" ,
187
+ index .getName (),
188
+ currentStep .getClass ().getSimpleName (),
189
+ currentStep .getKey (),
190
+ stepInfo == null ? "null" : Strings .toString (stepInfo )
191
+ );
192
+ }
193
+ // We may have executed a step and set "nextStepKey" to
194
+ // a value, but in this case, since the condition was
195
+ // not met, we can't advance any way, so don't attempt
196
+ // to run the current step
197
+ nextStepKey = null ;
198
+ if (stepInfo == null ) {
199
+ return state ;
200
+ }
201
+ return IndexLifecycleTransition .addStepInfoToClusterState (index , state , stepInfo );
202
+ }
203
+ }
204
+
205
+ private ClusterState moveToNextStep (ClusterState state ) {
206
+ if (nextStepKey == null ) {
207
+ return state ;
220
208
}
209
+ logger .trace ("[{}] moving cluster state to next step [{}]" , index .getName (), nextStepKey );
210
+ return IndexLifecycleTransition .moveClusterStateToStep (index , state , nextStepKey , nowSupplier , policyStepsRegistry , false );
221
211
}
222
212
223
213
@ Override
0 commit comments