1818import io .scalecube .services .sut .GreetingServiceImpl ;
1919import io .scalecube .services .transport .rsocket .RSocketServiceTransport ;
2020import java .time .Duration ;
21- import java .util .ArrayList ;
2221import java .util .List ;
2322import java .util .concurrent .CopyOnWriteArrayList ;
2423import java .util .function .Function ;
3231
3332public class ServiceRegistryTest extends BaseTest {
3433
35- public static final Duration TIMEOUT = Duration .ofSeconds (6 );
34+ public static final Duration TIMEOUT = Duration .ofSeconds (30 );
3635
3736 private static Stream <Arguments > metadataCodecSource () {
3837 return Stream .of (
@@ -44,16 +43,15 @@ private static Stream<Arguments> metadataCodecSource() {
4443 @ ParameterizedTest
4544 @ MethodSource ("metadataCodecSource" )
4645 public void test_added_removed_registration_events (MetadataCodec metadataCodec ) {
47-
48- List <ServiceDiscoveryEvent > events = new ArrayList <>();
46+ ReplayProcessor <ServiceDiscoveryEvent > events = ReplayProcessor .create ();
4947
5048 Microservices seed =
5149 Microservices .builder ()
5250 .discovery (defServiceDiscovery (metadataCodec ))
5351 .transport (RSocketServiceTransport ::new )
5452 .startAwait ();
5553
56- seed .discovery ().listenDiscovery ().subscribe (events :: add );
54+ seed .discovery ().listenDiscovery ().subscribe (events );
5755
5856 Address seedAddress = seed .discovery ().address ();
5957
@@ -71,17 +69,17 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec)
7169 .services (new GreetingServiceImpl ())
7270 .startAwait ();
7371
74- Mono . when ( ms1 . shutdown (), ms2 . shutdown ()). then ( Mono . delay ( TIMEOUT )). block ();
75-
76- assertEquals (6 , events . size ());
77- assertEquals ( ENDPOINT_ADDED , events . get ( 0 ). type ());
78- assertEquals (ENDPOINT_ADDED , events . get ( 1 ). type ());
79- assertEquals (ENDPOINT_LEAVING , events . get ( 2 ). type ());
80- assertEquals (ENDPOINT_LEAVING , events . get ( 3 ). type ());
81- assertEquals (ENDPOINT_REMOVED , events . get ( 4 ). type ());
82- assertEquals ( ENDPOINT_REMOVED , events . get ( 5 ). type ());
83-
84- seed . shutdown (). block ( );
72+ StepVerifier . create ( events )
73+ . assertNext ( event -> assertEquals ( ENDPOINT_ADDED , event . type ()))
74+ . assertNext ( event -> assertEquals (ENDPOINT_ADDED , event . type ()))
75+ . then (() -> Mono . whenDelayError ( ms1 . shutdown (), ms2 . shutdown ()). block ( TIMEOUT ))
76+ . assertNext ( event -> assertEquals (ENDPOINT_LEAVING , event . type ()))
77+ . assertNext ( event -> assertEquals (ENDPOINT_LEAVING , event . type ()))
78+ . assertNext ( event -> assertEquals (ENDPOINT_REMOVED , event . type ()))
79+ . assertNext ( event -> assertEquals (ENDPOINT_REMOVED , event . type ()))
80+ . then (() -> seed . shutdown (). block ( TIMEOUT ))
81+ . thenCancel ()
82+ . verify ( TIMEOUT );
8583 }
8684
8785 @ ParameterizedTest
@@ -126,13 +124,11 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
126124 cluster .add (ms2 );
127125 })
128126 .assertNext (event -> assertEquals (ENDPOINT_ADDED , event .type ()))
129- .then (() -> cluster .remove (2 ).shutdown ().block ())
127+ .then (() -> cluster .remove (2 ).shutdown ().block (TIMEOUT ))
130128 .assertNext (event -> assertEquals (ENDPOINT_LEAVING , event .type ()))
131- .thenAwait (TIMEOUT )
132129 .assertNext (event -> assertEquals (ENDPOINT_REMOVED , event .type ()))
133- .then (() -> cluster .remove (1 ).shutdown ().block ())
130+ .then (() -> cluster .remove (1 ).shutdown ().block (TIMEOUT ))
134131 .assertNext (event -> assertEquals (ENDPOINT_LEAVING , event .type ()))
135- .thenAwait (TIMEOUT )
136132 .assertNext (event -> assertEquals (ENDPOINT_REMOVED , event .type ()))
137133 .thenCancel ()
138134 .verify (TIMEOUT );
@@ -147,9 +143,8 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
147143 .thenCancel ()
148144 .verify (TIMEOUT );
149145
150- Mono .when (cluster .stream ().map (Microservices ::shutdown ).toArray (Mono []::new ))
151- .then (Mono .delay (TIMEOUT ))
152- .block ();
146+ Mono .whenDelayError (cluster .stream ().map (Microservices ::shutdown ).toArray (Mono []::new ))
147+ .block (TIMEOUT );
153148 }
154149
155150 @ ParameterizedTest
@@ -203,9 +198,8 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
203198 .thenCancel ()
204199 .verify (TIMEOUT );
205200
206- Mono .when (cluster .stream ().map (Microservices ::shutdown ).toArray (Mono []::new ))
207- .then (Mono .delay (TIMEOUT ))
208- .block ();
201+ Mono .whenDelayError (cluster .stream ().map (Microservices ::shutdown ).toArray (Mono []::new ))
202+ .block (TIMEOUT );
209203 }
210204
211205 private Function <ServiceEndpoint , ServiceDiscovery > defServiceDiscovery (
0 commit comments