3131import org .elasticsearch .common .settings .Setting .Property ;
3232import org .elasticsearch .common .settings .Settings ;
3333import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
34+ import org .elasticsearch .core .Nullable ;
3435import org .elasticsearch .core .SuppressForbidden ;
3536import org .elasticsearch .core .TimeValue ;
3637import org .elasticsearch .rest .RestStatus ;
38+ import org .elasticsearch .threadpool .Scheduler ;
3739import org .elasticsearch .threadpool .ThreadPool ;
3840
3941import java .util .concurrent .atomic .AtomicBoolean ;
@@ -80,9 +82,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
8082 private final TimeValue recoverAfterTime ;
8183 private final int recoverAfterDataNodes ;
8284 private final int expectedDataNodes ;
83-
84- private final AtomicBoolean recoveryInProgress = new AtomicBoolean ();
85- private final AtomicBoolean scheduledRecovery = new AtomicBoolean ();
85+ volatile PendingStateRecovery currentPendingStateRecovery ;
8686
8787 @ Inject
8888 public GatewayService (
@@ -131,8 +131,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
131131 }
132132
133133 final ClusterState state = event .state ();
134+ final DiscoveryNodes nodes = state .nodes ();
134135
135- if (state . nodes () .isLocalNodeElectedMaster () == false ) {
136+ if (nodes .isLocalNodeElectedMaster () == false ) {
136137 // not our job to recover
137138 return ;
138139 }
@@ -141,83 +142,153 @@ public void clusterChanged(final ClusterChangedEvent event) {
141142 return ;
142143 }
143144
144- final DiscoveryNodes nodes = state .nodes ();
145- if (state .nodes ().getMasterNodeId () == null ) {
146- logger .debug ("not recovering from gateway, no master elected yet" );
147- } else if (recoverAfterDataNodes != -1 && nodes .getDataNodes ().size () < recoverAfterDataNodes ) {
148- logger .debug (
149- "not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]" ,
150- nodes .getDataNodes ().size (),
151- recoverAfterDataNodes
152- );
153- } else {
154- boolean enforceRecoverAfterTime ;
155- String reason ;
156- if (expectedDataNodes == -1 ) {
157- // no expected is set, honor recover_after_data_nodes
158- enforceRecoverAfterTime = true ;
159- reason = "recover_after_time was set to [" + recoverAfterTime + "]" ;
160- } else if (expectedDataNodes <= nodes .getDataNodes ().size ()) {
161- // expected is set and satisfied so recover immediately
162- enforceRecoverAfterTime = false ;
163- reason = "" ;
145+ // At this point, we know the state is not recovered and this node is qualified for state recovery
146+ // But we still need to check whether a previous one is running already
147+ final long currentTerm = state .term ();
148+ final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery ;
149+
150+ // Always start a new state recovery if the master term changes
151+ // If there is a previous one still waiting, both will probably run but at most one of them will
152+ // actually make changes to cluster state because either:
153+ // 1. The previous recovers the cluster state and the current one will be skipped
154+ // 2. The previous one sees a new cluster term and skips its own execution
155+ if (existingPendingStateRecovery == null || existingPendingStateRecovery .expectedTerm < currentTerm ) {
156+ currentPendingStateRecovery = new PendingStateRecovery (currentTerm );
157+ }
158+ currentPendingStateRecovery .onDataNodeSize (nodes .getDataNodes ().size ());
159+ }
160+
161+ /**
162+ * This class manages the cluster state recovery behaviours. It has two major scenarios depending
163+ * on whether {@code recoverAfterDataNodes} is configured.
164+ *
165+ * <p> <b>When</b> {@code recoverAfterDataNodes} is configured:
166+ * <ol>
167+ * <li>Nothing can happen until it is reached
168+ * <li>When {@code recoverAfterDataNodes} is reached, the cluster either:
169+ * <ul>
170+ * <li>Recover immediately when {@code expectedDataNodes} is reached or
171+ * both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
172+ * <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
173+ * </ul>
174+ * <li>The scheduled recovery can be cancelled if {@code recoverAfterDataNodes} drops below required number
175+ * before the recovery can happen. When this happens, the process goes back to the beginning (step 1).
176+ * <li>The recovery is scheduled only once each time {@code recoverAfterDataNodes} crosses the required number
177+ * </ol>
178+ *
179+ * <p> <b>When</b> {@code recoverAfterDataNodes} is <b>Not</b> configured, the cluster either:
180+ * <ul>
181+ * <li>Recover immediately when {@code expectedDataNodes} is reached or
182+ * both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured
183+ * <li>Or schedule a recovery with a delay of {@code recoverAfterTime}
184+ * </ul>
185+ */
186+ class PendingStateRecovery {
187+ private final long expectedTerm ;
188+ @ Nullable
189+ private Scheduler .ScheduledCancellable scheduledRecovery ;
190+ private final AtomicBoolean taskSubmitted = new AtomicBoolean ();
191+
192+ PendingStateRecovery (long expectedTerm ) {
193+ this .expectedTerm = expectedTerm ;
194+ }
195+
196+ void onDataNodeSize (int currentDataNodeSize ) {
197+ if (recoverAfterDataNodes != -1 && currentDataNodeSize < recoverAfterDataNodes ) {
198+ logger .debug (
199+ "not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]" ,
200+ currentDataNodeSize ,
201+ recoverAfterDataNodes
202+ );
203+ cancelScheduledRecovery ();
164204 } else {
165- // expected is set but not satisfied so wait until it is satisfied or times out
166- enforceRecoverAfterTime = true ;
167- reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes .getDataNodes ().size () + "]" ;
205+ maybePerformOrScheduleRecovery (currentDataNodeSize );
168206 }
169- performStateRecovery (enforceRecoverAfterTime , reason );
170207 }
171- }
172208
173- private void performStateRecovery (final boolean enforceRecoverAfterTime , final String reason ) {
174- if (enforceRecoverAfterTime && recoverAfterTime != null ) {
175- if (scheduledRecovery .compareAndSet (false , true )) {
176- logger .info ("delaying initial state recovery for [{}]. {}" , recoverAfterTime , reason );
177- threadPool .schedule (new AbstractRunnable () {
178- @ Override
179- public void onFailure (Exception e ) {
180- logger .warn ("delayed state recovery failed" , e );
181- resetRecoveredFlags ();
182- }
183-
184- @ Override
185- protected void doRun () {
186- if (recoveryInProgress .compareAndSet (false , true )) {
187- logger .info ("recover_after_time [{}] elapsed. performing state recovery..." , recoverAfterTime );
188- runRecovery ();
209+ void maybePerformOrScheduleRecovery (int currentDataNodeSize ) {
210+ if (expectedDataNodes != -1 && expectedDataNodes <= currentDataNodeSize ) {
211+ logger .debug (
212+ "performing state recovery of term [{}], expected data nodes [{}] is reached" ,
213+ expectedTerm ,
214+ expectedDataNodes
215+ );
216+ cancelScheduledRecovery ();
217+ runRecoveryImmediately ();
218+ } else if (recoverAfterTime == null ) {
219+ logger .debug ("performing state recovery of term [{}], no delay time is configured" , expectedTerm );
220+ cancelScheduledRecovery ();
221+ runRecoveryImmediately ();
222+ } else {
223+ if (scheduledRecovery == null ) {
224+ logger .info (
225+ "delaying initial state recovery for [{}] of term [{}]. expecting [{}] data nodes, but only have [{}]" ,
226+ recoverAfterTime ,
227+ expectedTerm ,
228+ expectedDataNodes ,
229+ currentDataNodeSize
230+ );
231+ scheduledRecovery = threadPool .schedule (new AbstractRunnable () {
232+ @ Override
233+ public void onFailure (Exception e ) {
234+ logger .warn ("delayed state recovery of term [" + expectedTerm + "] failed" , e );
189235 }
190- }
191- }, recoverAfterTime , threadPool .generic ());
192- }
193- } else {
194- if (recoveryInProgress .compareAndSet (false , true )) {
195- try {
196- logger .debug ("performing state recovery..." );
197- runRecovery ();
198- } catch (Exception e ) {
199- logger .warn ("state recovery failed" , e );
200- resetRecoveredFlags ();
236+
237+ @ Override
238+ protected void doRun () {
239+ final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery ;
240+ if (PendingStateRecovery .this == existingPendingStateRecovery ) {
241+ runRecoveryImmediately ();
242+ } else {
243+ logger .debug (
244+ "skip scheduled state recovery since a new one of term [{}] has started" ,
245+ existingPendingStateRecovery .expectedTerm
246+ );
247+ }
248+ }
249+ }, recoverAfterTime , threadPool .generic ());
250+ } else {
251+ logger .debug ("state recovery is in already scheduled for term [{}]" , expectedTerm );
201252 }
202253 }
203254 }
204- }
205255
206- private void resetRecoveredFlags () {
207- recoveryInProgress .set (false );
208- scheduledRecovery .set (false );
256+ void runRecoveryImmediately () {
257+ if (taskSubmitted .compareAndSet (false , true )) {
258+ submitUnbatchedTask (TASK_SOURCE , new RecoverStateUpdateTask (expectedTerm ));
259+ } else {
260+ logger .debug ("state recovery task is already submitted" );
261+ }
262+ }
263+
264+ void cancelScheduledRecovery () {
265+ if (scheduledRecovery != null ) {
266+ scheduledRecovery .cancel ();
267+ scheduledRecovery = null ;
268+ }
269+ }
209270 }
210271
211272 private static final String TASK_SOURCE = "local-gateway-elected-state" ;
212273
213274 class RecoverStateUpdateTask extends ClusterStateUpdateTask {
214275
276+ private final long expectedTerm ;
277+
278+ RecoverStateUpdateTask (long expectedTerm ) {
279+ this .expectedTerm = expectedTerm ;
280+ }
281+
215282 @ Override
216283 public ClusterState execute (final ClusterState currentState ) {
217284 if (currentState .blocks ().hasGlobalBlock (STATE_NOT_RECOVERED_BLOCK ) == false ) {
218285 logger .debug ("cluster is already recovered" );
219286 return currentState ;
220287 }
288+ if (expectedTerm != currentState .term ()) {
289+ logger .debug ("skip state recovery since current term [{}] != expected term [{}]" , currentState .term (), expectedTerm );
290+ return currentState ;
291+ }
221292 return ClusterStateUpdaters .removeStateNotRecoveredBlock (
222293 ClusterStateUpdaters .updateRoutingTable (currentState , shardRoutingRoleStrategy )
223294 );
@@ -228,7 +299,6 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
228299 logger .info ("recovered [{}] indices into cluster_state" , newState .metadata ().indices ().size ());
229300 // reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
230301 // not-recovered state, that we again do another state recovery.
231- resetRecoveredFlags ();
232302 rerouteService .reroute ("state recovered" , Priority .NORMAL , ActionListener .noop ());
233303 }
234304
@@ -239,7 +309,6 @@ public void onFailure(final Exception e) {
239309 () -> "unexpected failure during [" + TASK_SOURCE + "]" ,
240310 e
241311 );
242- resetRecoveredFlags ();
243312 }
244313 }
245314
@@ -248,10 +317,6 @@ TimeValue recoverAfterTime() {
248317 return recoverAfterTime ;
249318 }
250319
251- private void runRecovery () {
252- submitUnbatchedTask (TASK_SOURCE , new RecoverStateUpdateTask ());
253- }
254-
255320 @ SuppressForbidden (reason = "legacy usage of unbatched task" ) // TODO add support for batching here
256321 private void submitUnbatchedTask (@ SuppressWarnings ("SameParameterValue" ) String source , ClusterStateUpdateTask task ) {
257322 clusterService .submitUnbatchedStateUpdateTask (source , task );
0 commit comments