@@ -104,6 +104,7 @@ public class DistributorTest {
104
104
private Capabilities stereotype ;
105
105
private Capabilities caps ;
106
106
private final Secret registrationSecret = new Secret ("hellim" );
107
+ private final Wait <Object > wait = new FluentWait <>(new Object ()).withTimeout (Duration .ofSeconds (5 ));
107
108
108
109
@ Before
109
110
public void setUp () {
@@ -309,6 +310,8 @@ public void testDrainedNodeShutsDownOnceEmpty() throws URISyntaxException, Inter
309
310
queuer ,
310
311
registrationSecret );
311
312
distributor .add (node );
313
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
314
+
312
315
distributor .drain (node .getId ());
313
316
314
317
latch .await (5 , TimeUnit .SECONDS );
@@ -352,6 +355,8 @@ public void drainedNodeDoesNotShutDownIfNotEmpty()
352
355
registrationSecret );
353
356
distributor .add (node );
354
357
358
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
359
+
355
360
NewSessionPayload payload = NewSessionPayload .create (caps );
356
361
distributor .newSession (createRequest (payload ));
357
362
@@ -394,6 +399,8 @@ public void drainedNodeShutsDownAfterSessionsFinish()
394
399
registrationSecret );
395
400
distributor .add (node );
396
401
402
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
403
+
397
404
NewSessionPayload payload = NewSessionPayload .create (caps );
398
405
CreateSessionResponse firstResponse = distributor .newSession (createRequest (payload ));
399
406
CreateSessionResponse secondResponse = distributor .newSession (createRequest (payload ));
@@ -504,6 +511,8 @@ public void theMostLightlyLoadedNodeIsSelectedFirst() {
504
511
.add (lightest )
505
512
.add (massive );
506
513
514
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
515
+
507
516
try (NewSessionPayload payload = NewSessionPayload .create (caps )) {
508
517
Session session = distributor .newSession (createRequest (payload )).getSession ();
509
518
@@ -633,7 +642,9 @@ public void shouldIncludeHostsThatAreUpInHostList() {
633
642
}
634
643
635
644
@ Test
636
- public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed () {
645
+ public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed () throws URISyntaxException {
646
+ URI nodeUri = new URI ("http://example:5678" );
647
+ URI routableUri = new URI ("http://localhost:1234" );
637
648
SessionMap sessions = new LocalSessionMap (tracer , bus );
638
649
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue (
639
650
tracer ,
@@ -642,19 +653,20 @@ public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
642
653
Duration .ofSeconds (2 ));
643
654
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer (tracer , bus , localNewSessionQueue );
644
655
645
- CombinedHandler handler = new CombinedHandler ();
656
+ LocalNode node = LocalNode .builder (tracer , bus , routableUri , routableUri , registrationSecret )
657
+ .add (caps , new TestSessionFactory ((id , c ) -> new Session (
658
+ id , nodeUri , stereotype , c , Instant .now ())))
659
+ .build ();
646
660
Distributor distributor = new LocalDistributor (
647
661
tracer ,
648
662
bus ,
649
- new PassthroughHttpClient .Factory (handler ),
663
+ new PassthroughHttpClient .Factory (node ),
650
664
sessions ,
651
665
queuer ,
652
666
registrationSecret );
653
- handler .addHandler (distributor );
654
667
655
- Node node = createNode (caps , 1 , 0 );
656
- handler .addHandler (node );
657
668
distributor .add (node );
669
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
658
670
659
671
// Use up the one slot available
660
672
try (NewSessionPayload payload = NewSessionPayload .create (caps )) {
@@ -669,7 +681,9 @@ public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
669
681
}
670
682
671
683
@ Test
672
- public void shouldReleaseSlotOnceSessionEnds () {
684
+ public void shouldReleaseSlotOnceSessionEnds () throws URISyntaxException {
685
+ URI nodeUri = new URI ("http://example:5678" );
686
+ URI routableUri = new URI ("http://localhost:1234" );
673
687
SessionMap sessions = new LocalSessionMap (tracer , bus );
674
688
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue (
675
689
tracer ,
@@ -678,20 +692,22 @@ public void shouldReleaseSlotOnceSessionEnds() {
678
692
Duration .ofSeconds (2 ));
679
693
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer (tracer , bus , localNewSessionQueue );
680
694
681
- CombinedHandler handler = new CombinedHandler ();
695
+ LocalNode node = LocalNode .builder (tracer , bus , routableUri , routableUri , registrationSecret )
696
+ .add (caps , new TestSessionFactory ((id , c ) -> new Session (
697
+ id , nodeUri , stereotype , c , Instant .now ())))
698
+ .build ();
699
+
682
700
Distributor distributor = new LocalDistributor (
683
701
tracer ,
684
702
bus ,
685
- new PassthroughHttpClient .Factory (handler ),
703
+ new PassthroughHttpClient .Factory (node ),
686
704
sessions ,
687
705
queuer ,
688
706
registrationSecret );
689
- handler .addHandler (distributor );
690
-
691
- Node node = createNode (caps , 1 , 0 );
692
- handler .addHandler (node );
693
707
distributor .add (node );
694
708
709
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
710
+
695
711
// Use up the one slot available
696
712
Session session ;
697
713
try (NewSessionPayload payload = NewSessionPayload .create (caps )) {
@@ -702,9 +718,7 @@ public void shouldReleaseSlotOnceSessionEnds() {
702
718
sessions .get (session .getId ());
703
719
704
720
node .stop (session .getId ());
705
-
706
721
// Now wait for the session map to say the session is gone.
707
- Wait <Object > wait = new FluentWait <>(new Object ()).withTimeout (Duration .ofSeconds (2 ));
708
722
wait .until (obj -> {
709
723
try {
710
724
sessions .get (session .getId ());
@@ -756,8 +770,9 @@ public void shouldNotStartASessionIfTheCapabilitiesAreNotSupported() {
756
770
}
757
771
758
772
@ Test
759
- public void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable () {
760
- CombinedHandler handler = new CombinedHandler ();
773
+ public void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable () throws URISyntaxException {
774
+ URI nodeUri = new URI ("http://example:5678" );
775
+ URI routableUri = new URI ("http://localhost:1234" );
761
776
762
777
SessionMap sessions = new LocalSessionMap (tracer , bus );
763
778
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue (
@@ -766,26 +781,24 @@ public void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() {
766
781
Duration .ofSeconds (2 ),
767
782
Duration .ofSeconds (2 ));
768
783
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer (tracer , bus , localNewSessionQueue );
769
- handler .addHandler (sessions );
770
784
771
- URI uri = createUri ();
772
- Node node = LocalNode .builder (tracer , bus , uri , uri , registrationSecret )
773
- .add (caps , new TestSessionFactory ((id , caps ) -> {
774
- throw new SessionNotCreatedException ("OMG" );
775
- }))
776
- .build ();
777
- handler .addHandler (node );
785
+ LocalNode node = LocalNode .builder (tracer , bus , routableUri , routableUri , registrationSecret )
786
+ .add (caps , new TestSessionFactory ((id , caps ) -> {
787
+ throw new SessionNotCreatedException ("OMG" );
788
+ }))
789
+ .build ();
778
790
779
791
Distributor distributor = new LocalDistributor (
780
792
tracer ,
781
793
bus ,
782
- new PassthroughHttpClient .Factory (handler ),
794
+ new PassthroughHttpClient .Factory (node ),
783
795
sessions ,
784
796
queuer ,
785
797
registrationSecret );
786
- handler .addHandler (distributor );
787
798
distributor .add (node );
788
799
800
+ wait .until (obj -> distributor .getStatus ().hasCapacity ());
801
+
789
802
try (NewSessionPayload payload = NewSessionPayload .create (caps )) {
790
803
assertThatExceptionOfType (SessionNotCreatedException .class )
791
804
.isThrownBy (() -> distributor .newSession (createRequest (payload )));
0 commit comments