1616package ch .rasc .sse .eventbus ;
1717
1818import static org .assertj .core .api .Assertions .assertThat ;
19+ import static org .awaitility .Awaitility .await ;
1920import static org .junit .jupiter .api .Assertions .fail ;
2021
2122import java .io .IOException ;
2829import java .util .concurrent .TimeoutException ;
2930
3031import org .junit .jupiter .api .BeforeEach ;
31- import org .junit .jupiter .api .Disabled ;
3232import org .junit .jupiter .api .Test ;
3333import org .springframework .beans .factory .annotation .Autowired ;
3434import org .springframework .boot .test .context .SpringBootTest ;
@@ -91,10 +91,11 @@ public void testRegisterAndSubscribe() {
9191 }
9292
9393 @ Test
94- @ Disabled
9594 public void testRegisterAndSubscribeOnly () {
9695 SubscribeResponse sseResponse1 = registerAndSubscribeOnly ("1" , "event1" , 2 );
9796 sleep (3 , TimeUnit .SECONDS );
97+ sseResponse1 .eventSource ().close ();
98+
9899 SubscribeResponse sseResponse2 = registerAndSubscribe ("1" , "event2" , 2 );
99100 sleep (3 , TimeUnit .SECONDS );
100101
@@ -111,14 +112,14 @@ public void testRegisterAndSubscribeOnly() {
111112 assertSseResponse (sseResponse2 , new ResponseData ("event1" , "payload1" ), new ResponseData ("event2" , "payload2" ));
112113
113114 SubscribeResponse sseResponse = registerAndSubscribeOnly ("1" , "event3" , 1 );
115+ sleep (1 , TimeUnit .SECONDS );
114116 this .eventPublisher .publishEvent (SseEvent .of ("event1" , "payload1" ));
115117 this .eventPublisher .publishEvent (SseEvent .of ("event2" , "payload2" ));
116118 this .eventPublisher .publishEvent (SseEvent .of ("event3" , "payload3" ));
117119
118120 assertSseResponse (sseResponse , new ResponseData ("event3" , "payload3" ));
119121
120- sseResponse1 .eventSource ().close ();
121- sseResponse2 .eventSource ().close ();
122+ sseResponse .eventSource ().close ();
122123 }
123124
124125 @ Test
@@ -369,12 +370,12 @@ public void testMultipleSubscriptions() throws IOException {
369370 }
370371
371372 @ Test
372- @ Disabled
373373 public void testReconnect () throws IOException {
374- SubscribeResponse sseResponse = registerSubscribe ("1" , "eventName" , true , 1 , true );
375- sleep (2 , TimeUnit .SECONDS );
376- // assertSseResponseWithException(sseResponse);
377- sleep (2 , TimeUnit .SECONDS );
374+ SubscribeResponse sseResponse = registerSubscribe ("1" , "eventName" );
375+ sleep (500 , TimeUnit .MILLISECONDS );
376+ sseResponse .eventSource ().close ();
377+ sleep (4 , TimeUnit .SECONDS );
378+
378379 assertThat (this .eventBus .getAllClientIds ()).hasSize (1 );
379380 assertThat (this .eventBus .getAllEvents ()).containsOnly ("eventName" );
380381 assertThat (this .eventBus .hasSubscribers ("eventName" )).isTrue ();
@@ -389,6 +390,8 @@ public void testReconnect() throws IOException {
389390 sseEvent = SseEvent .builder ().event ("eventName" ).data ("payload3" ).build ();
390391 this .eventPublisher .publishEvent (sseEvent );
391392
393+ sleep (500 , TimeUnit .MILLISECONDS );
394+
392395 sseResponse = registerSubscribe ("1" , "eventName" , 3 );
393396 assertSseResponse (sseResponse , new ResponseData ("eventName" , "payload1" ),
394397 new ResponseData ("eventName" , "payload2" ), new ResponseData ("eventName" , "payload3" ));
@@ -399,23 +402,6 @@ public void testReconnect() throws IOException {
399402 assertThat (this .eventBus .countSubscribers ("eventName" )).isEqualTo (1 );
400403 assertThat (this .eventBus .getAllSubscriptions ()).containsOnlyKeys ("eventName" );
401404
402- sseEvent = SseEvent .builder ().event ("eventName" ).data ("payload4" ).build ();
403- this .eventPublisher .publishEvent (sseEvent );
404- sseEvent = SseEvent .builder ().event ("eventName" ).data ("payload5" ).build ();
405- this .eventPublisher .publishEvent (sseEvent );
406- sseEvent = SseEvent .builder ().event ("eventName" ).data ("payload6" ).build ();
407- this .eventPublisher .publishEvent (sseEvent );
408-
409- sseResponse = registerSubscribe ("1" , "eventName" , 3 );
410- assertSseResponse (sseResponse , new ResponseData ("eventName" , "payload4" ),
411- new ResponseData ("eventName" , "payload5" ), new ResponseData ("eventName" , "payload6" ));
412- assertThat (this .eventBus .getAllClientIds ()).hasSize (1 );
413- assertThat (this .eventBus .getAllEvents ()).containsOnly ("eventName" );
414- assertThat (this .eventBus .hasSubscribers ("eventName" )).isTrue ();
415- assertThat (this .eventBus .getSubscribers ("eventName" )).containsOnly ("1" );
416- assertThat (this .eventBus .countSubscribers ("eventName" )).isEqualTo (1 );
417- assertThat (this .eventBus .getAllSubscriptions ()).containsOnlyKeys ("eventName" );
418-
419405 this .eventBus .unregisterClient ("1" );
420406 assertThat (this .eventBus .getAllClientIds ()).hasSize (0 );
421407 assertThat (this .eventBus .getAllEvents ()).isEmpty ();
@@ -438,14 +424,14 @@ public void testClientExpiration() throws IOException {
438424 assertThat (this .eventBus .getAllSubscriptions ()).containsOnlyKeys ("eventName" );
439425 response .eventSource ().close ();
440426
441- sleep ( 21 , TimeUnit . SECONDS );
442-
443- assertThat (this .eventBus .getAllClientIds ()).hasSize ( 0 );
444- assertThat (this .eventBus .getAllEvents ( )).isEmpty ();
445- assertThat (this .eventBus .hasSubscribers ("eventName" )).isFalse ();
446- assertThat (this .eventBus .getSubscribers ("eventName" )).isEmpty ( );
447- assertThat (this .eventBus .countSubscribers ( "eventName" )).isEqualTo ( 0 );
448- assertThat ( this . eventBus . getAllSubscriptions ()). isEmpty ( );
427+ await (). atMost ( Duration . ofSeconds ( 8 )). untilAsserted (() -> {
428+ assertThat ( this . eventBus . getAllClientIds ()). hasSize ( 0 );
429+ assertThat (this .eventBus .getAllEvents ()).isEmpty ( );
430+ assertThat (this .eventBus .hasSubscribers ( "eventName" )).isFalse ();
431+ assertThat (this .eventBus .getSubscribers ("eventName" )).isEmpty ();
432+ assertThat (this .eventBus .countSubscribers ("eventName" )).isEqualTo ( 0 );
433+ assertThat (this .eventBus .getAllSubscriptions ( )).isEmpty ( );
434+ } );
449435
450436 response .eventSource ().close ();
451437 }
@@ -477,13 +463,14 @@ public void testMany() throws IOException {
477463 response .eventSource ().close ();
478464 }
479465
480- sleep (21 , TimeUnit .SECONDS );
481- assertThat (this .eventBus .getAllClientIds ()).hasSize (0 );
482- assertThat (this .eventBus .getAllEvents ()).isEmpty ();
483- assertThat (this .eventBus .hasSubscribers ("eventName" )).isFalse ();
484- assertThat (this .eventBus .getSubscribers ("eventName" )).isEmpty ();
485- assertThat (this .eventBus .countSubscribers ("eventName" )).isEqualTo (0 );
486- assertThat (this .eventBus .getAllSubscriptions ()).isEmpty ();
466+ await ().atMost (Duration .ofSeconds (8 )).untilAsserted (() -> {
467+ assertThat (this .eventBus .getAllClientIds ()).hasSize (0 );
468+ assertThat (this .eventBus .getAllEvents ()).isEmpty ();
469+ assertThat (this .eventBus .hasSubscribers ("eventName" )).isFalse ();
470+ assertThat (this .eventBus .getSubscribers ("eventName" )).isEmpty ();
471+ assertThat (this .eventBus .countSubscribers ("eventName" )).isEqualTo (0 );
472+ assertThat (this .eventBus .getAllSubscriptions ()).isEmpty ();
473+ });
487474 }
488475
489476 @ Test
@@ -584,7 +571,7 @@ private static void assertSseResponse(SubscribeResponse response, ResponseData..
584571 try {
585572 List <ResponseData > rds ;
586573 try {
587- rds = response .dataFuture .get (5 , TimeUnit .SECONDS );
574+ rds = response .dataFuture .get (2 , TimeUnit .SECONDS );
588575 }
589576 catch (TimeoutException e ) {
590577 rds = List .of ();
0 commit comments