@@ -262,21 +262,44 @@ private boolean tryAcquireOrRenew() {
262
262
return false ;
263
263
}
264
264
265
+ if (log .isDebugEnabled ()) {
266
+ log .debug ("Lock not found, try to create it" );
267
+ }
268
+
269
+ // No Lock resource exists, try to get leadership by creating it
265
270
return createLock (lock , leaderElectionRecord );
266
271
}
267
272
273
+ // alright, we have an existing lock resource
274
+ // 1. Is Lock Empty? --> try to get leadership by updating it
275
+ // 2. Am I the Leader? --> update info and renew lease by updating it
276
+ // 3. I am not the Leader?
277
+ // 3.1 is Lock expired? --> try to get leadership by updating it
278
+ // 3.2 Lock not expired? --> update info, try later
279
+
268
280
if (oldLeaderElectionRecord == null
269
281
|| oldLeaderElectionRecord .getAcquireTime () == null
270
282
|| oldLeaderElectionRecord .getRenewTime () == null
271
283
|| oldLeaderElectionRecord .getHolderIdentity () == null ) {
272
- return createLock (lock , leaderElectionRecord );
284
+ // We found the lock resource with an empty LeaderElectionRecord, try to get leadership by updating it
285
+ if (log .isDebugEnabled ()) {
286
+ log .debug ("Update lock to get lease" );
287
+ }
288
+
289
+ if (oldLeaderElectionRecord != null ) {
290
+ // maintain the leaderTransitions
291
+ leaderElectionRecord .setLeaderTransitions (oldLeaderElectionRecord .getLeaderTransitions () + 1 );
292
+ }
293
+
294
+ return updateLock (lock , leaderElectionRecord );
273
295
}
274
296
275
- // 2. Record obtained, check the Identity & Time
297
+ // 2. Record obtained with LeaderElectionRecord , check the Identity & Time
276
298
if (!oldLeaderElectionRecord .equals (this .observedRecord )) {
277
299
this .observedRecord = oldLeaderElectionRecord ;
278
300
this .observedTimeMilliSeconds = System .currentTimeMillis ();
279
301
}
302
+
280
303
if (observedTimeMilliSeconds + config .getLeaseDuration ().toMillis () > now .getTime ()
281
304
&& !isLeader ()) {
282
305
if (log .isDebugEnabled ()) {
@@ -296,26 +319,20 @@ private boolean tryAcquireOrRenew() {
296
319
leaderElectionRecord .setLeaderTransitions (oldLeaderElectionRecord .getLeaderTransitions () + 1 );
297
320
}
298
321
299
- // update the lock itself
300
322
if (log .isDebugEnabled ()) {
301
- log .debug ("Update lock acquire time to keep lease" );
323
+ log .debug ("Update lock to renew lease" );
302
324
}
303
- boolean updateSuccess = config .getLock ().update (leaderElectionRecord );
304
- if (!updateSuccess ) {
305
- return false ;
306
- }
307
- this .observedRecord = leaderElectionRecord ;
308
- this .observedTimeMilliSeconds = System .currentTimeMillis ();
309
- if (log .isDebugEnabled ()) {
325
+
326
+ boolean renewalStatus = updateLock (lock , leaderElectionRecord );
327
+
328
+ if (renewalStatus && log .isDebugEnabled ()) {
310
329
log .debug ("TryAcquireOrRenew return success" );
311
330
}
312
- return true ;
331
+
332
+ return renewalStatus ;
313
333
}
314
334
315
335
private boolean createLock (Lock lock , LeaderElectionRecord leaderElectionRecord ) {
316
- if (log .isDebugEnabled ()) {
317
- log .debug ("Lock not found, try to create it" );
318
- }
319
336
boolean createSuccess = lock .create (leaderElectionRecord );
320
337
if (!createSuccess ) {
321
338
return false ;
@@ -325,6 +342,16 @@ private boolean createLock(Lock lock, LeaderElectionRecord leaderElectionRecord)
325
342
return true ;
326
343
}
327
344
345
+ private boolean updateLock (Lock lock , LeaderElectionRecord leaderElectionRecord ) {
346
+ boolean updateSuccess = lock .update (leaderElectionRecord );
347
+ if (!updateSuccess ) {
348
+ return false ;
349
+ }
350
+ this .observedRecord = leaderElectionRecord ;
351
+ this .observedTimeMilliSeconds = System .currentTimeMillis ();
352
+ return true ;
353
+ }
354
+
328
355
private boolean isLeader () {
329
356
return this .config .getLock ().identity ().equals (this .observedRecord .getHolderIdentity ());
330
357
}
@@ -345,8 +372,49 @@ private void maybeReportTransition() {
345
372
346
373
@ Override
347
374
public void close () {
375
+ log .info ("Closing..." );
348
376
scheduledWorkers .shutdownNow ();
349
377
leaseWorkers .shutdownNow ();
350
378
hookWorkers .shutdownNow ();
379
+
380
+ // If I am the leader, free the lock so that other candidates can take it immediately
381
+ if (observedRecord != null && isLeader ()) {
382
+
383
+ // First ensure that all executors have stopped
384
+ try {
385
+ boolean isTerminated = scheduledWorkers .awaitTermination (config .getRetryPeriod ().getSeconds (), TimeUnit .SECONDS );
386
+ if (!isTerminated ) {
387
+ log .warn ("scheduledWorkers executor termination didn't finish." );
388
+ return ;
389
+ }
390
+
391
+ isTerminated = leaseWorkers .awaitTermination (config .getRetryPeriod ().getSeconds (), TimeUnit .SECONDS );
392
+ if (!isTerminated ) {
393
+ log .warn ("leaseWorkers executor termination didn't finish." );
394
+ return ;
395
+ }
396
+
397
+ isTerminated = hookWorkers .awaitTermination (config .getRetryPeriod ().getSeconds (), TimeUnit .SECONDS );
398
+ if (!isTerminated ) {
399
+ log .warn ("hookWorkers executor termination didn't finish." );
400
+ return ;
401
+ }
402
+ } catch (InterruptedException ex ) {
403
+ log .warn ("Failed to ensure executors termination." , ex );
404
+ return ;
405
+ }
406
+
407
+ log .info ("Giving up the lock...." );
408
+ LeaderElectionRecord emptyRecord = new LeaderElectionRecord ();
409
+ // maintain leaderTransitions count
410
+ emptyRecord .setLeaderTransitions (observedRecord .getLeaderTransitions ());
411
+ boolean status = this .config .getLock ().update (emptyRecord );
412
+
413
+ if (!status ) {
414
+ log .warn ("Failed to give up the lock." );
415
+ }
416
+ }
417
+
418
+ log .info ("Closed" );
351
419
}
352
420
}
0 commit comments