24
24
import java .util .concurrent .CountDownLatch ;
25
25
import java .util .concurrent .ExecutorService ;
26
26
import java .util .concurrent .Executors ;
27
+ import java .util .concurrent .Semaphore ;
27
28
import java .util .concurrent .atomic .AtomicReference ;
28
29
import java .util .concurrent .locks .ReentrantLock ;
29
30
import java .util .function .Consumer ;
@@ -291,6 +292,7 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte
291
292
() -> {
292
293
leaderElector .run (() -> {}, () -> {});
293
294
});
295
+ // TODO: Remove this sleep
294
296
Thread .sleep (Duration .ofSeconds (2 ).toMillis ());
295
297
assertEquals (expectedException , actualException .get ().getCause ());
296
298
}
@@ -308,7 +310,18 @@ public void testLeaderElectionReportLeaderOnStart() throws ApiException, Interru
308
310
setLeaderTransitions (1 );
309
311
setLeaseDurationSeconds (60 );
310
312
}
313
+ })
314
+ .thenReturn (
315
+ new LeaderElectionRecord () {
316
+ {
317
+ setHolderIdentity ("foo3" );
318
+ setAcquireTime (new Date ());
319
+ setRenewTime (new Date ());
320
+ setLeaderTransitions (1 );
321
+ setLeaseDurationSeconds (60 );
322
+ }
311
323
});
324
+
312
325
List <String > notifications = new ArrayList <>();
313
326
LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfig ();
314
327
leaderElectionConfig .setLock (lock );
@@ -317,25 +330,21 @@ public void testLeaderElectionReportLeaderOnStart() throws ApiException, Interru
317
330
leaderElectionConfig .setRenewDeadline (Duration .ofMillis (700 ));
318
331
LeaderElector leaderElector = new LeaderElector (leaderElectionConfig );
319
332
ExecutorService leaderElectionWorker = Executors .newFixedThreadPool (1 );
333
+ final Semaphore s = new Semaphore (2 );
334
+ s .acquire (2 );
320
335
leaderElectionWorker .submit (
321
336
() -> {
322
- leaderElector .run (() -> {}, () -> {}, (id ) -> notifications .add (id ));
337
+ leaderElector .run (
338
+ () -> {},
339
+ () -> {},
340
+ (id ) -> {
341
+ notifications .add (id );
342
+ s .release ();
343
+ });
323
344
});
324
345
325
- Thread .sleep (Duration .ofSeconds (2 ).toMillis ());
326
-
327
- when (lock .get ())
328
- .thenReturn (
329
- new LeaderElectionRecord () {
330
- {
331
- setHolderIdentity ("foo3" );
332
- setAcquireTime (new Date ());
333
- setRenewTime (new Date ());
334
- setLeaderTransitions (1 );
335
- setLeaseDurationSeconds (60 );
336
- }
337
- });
338
- Thread .sleep (Duration .ofSeconds (2 ).toMillis ());
346
+ // wait for two notifications to occur.
347
+ s .acquire (2 );
339
348
340
349
assertEquals (2 , notifications .size ());
341
350
assertEquals ("foo2" , notifications .get (0 ));
@@ -365,12 +374,20 @@ public void testLeaderElectionShouldReportLeaderItAcquiresOnStart()
365
374
leaderElectionConfig .setRenewDeadline (Duration .ofMillis (700 ));
366
375
LeaderElector leaderElector = new LeaderElector (leaderElectionConfig );
367
376
ExecutorService leaderElectionWorker = Executors .newFixedThreadPool (1 );
377
+ Semaphore s = new Semaphore (1 );
378
+ s .acquire ();
368
379
leaderElectionWorker .submit (
369
380
() -> {
370
- leaderElector .run (() -> {}, () -> {}, (id ) -> notifications .add (id ));
381
+ leaderElector .run (
382
+ () -> {},
383
+ () -> {},
384
+ (id ) -> {
385
+ notifications .add (id );
386
+ s .release ();
387
+ });
371
388
});
372
389
373
- Thread . sleep ( Duration . ofSeconds ( 2 ). toMillis () );
390
+ s . acquire ( );
374
391
assertEquals (1 , notifications .size ());
375
392
assertEquals ("foo1" , notifications .get (0 ));
376
393
}
0 commit comments