Skip to content

Commit 047756b

Browse files
committed
Enhance ServiceRegistryTest
1 parent 0120e79 commit 047756b

File tree

1 file changed

+20
-26
lines changed

1 file changed

+20
-26
lines changed

services/src/test/java/io/scalecube/services/ServiceRegistryTest.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.scalecube.services.sut.GreetingServiceImpl;
1919
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
2020
import java.time.Duration;
21-
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.concurrent.CopyOnWriteArrayList;
2423
import java.util.function.Function;
@@ -32,7 +31,7 @@
3231

3332
public class ServiceRegistryTest extends BaseTest {
3433

35-
public static final Duration TIMEOUT = Duration.ofSeconds(6);
34+
public static final Duration TIMEOUT = Duration.ofSeconds(30);
3635

3736
private static Stream<Arguments> metadataCodecSource() {
3837
return Stream.of(
@@ -44,16 +43,15 @@ private static Stream<Arguments> metadataCodecSource() {
4443
@ParameterizedTest
4544
@MethodSource("metadataCodecSource")
4645
public void test_added_removed_registration_events(MetadataCodec metadataCodec) {
47-
48-
List<ServiceDiscoveryEvent> events = new ArrayList<>();
46+
ReplayProcessor<ServiceDiscoveryEvent> events = ReplayProcessor.create();
4947

5048
Microservices seed =
5149
Microservices.builder()
5250
.discovery(defServiceDiscovery(metadataCodec))
5351
.transport(RSocketServiceTransport::new)
5452
.startAwait();
5553

56-
seed.discovery().listenDiscovery().subscribe(events::add);
54+
seed.discovery().listenDiscovery().subscribe(events);
5755

5856
Address seedAddress = seed.discovery().address();
5957

@@ -71,17 +69,17 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec)
7169
.services(new GreetingServiceImpl())
7270
.startAwait();
7371

74-
Mono.when(ms1.shutdown(), ms2.shutdown()).then(Mono.delay(TIMEOUT)).block();
75-
76-
assertEquals(6, events.size());
77-
assertEquals(ENDPOINT_ADDED, events.get(0).type());
78-
assertEquals(ENDPOINT_ADDED, events.get(1).type());
79-
assertEquals(ENDPOINT_LEAVING, events.get(2).type());
80-
assertEquals(ENDPOINT_LEAVING, events.get(3).type());
81-
assertEquals(ENDPOINT_REMOVED, events.get(4).type());
82-
assertEquals(ENDPOINT_REMOVED, events.get(5).type());
83-
84-
seed.shutdown().block();
72+
StepVerifier.create(events)
73+
.assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type()))
74+
.assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type()))
75+
.then(() -> Mono.whenDelayError(ms1.shutdown(), ms2.shutdown()).block(TIMEOUT))
76+
.assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type()))
77+
.assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type()))
78+
.assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type()))
79+
.assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type()))
80+
.then(() -> seed.shutdown().block(TIMEOUT))
81+
.thenCancel()
82+
.verify(TIMEOUT);
8583
}
8684

8785
@ParameterizedTest
@@ -126,13 +124,11 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
126124
cluster.add(ms2);
127125
})
128126
.assertNext(event -> assertEquals(ENDPOINT_ADDED, event.type()))
129-
.then(() -> cluster.remove(2).shutdown().block())
127+
.then(() -> cluster.remove(2).shutdown().block(TIMEOUT))
130128
.assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type()))
131-
.thenAwait(TIMEOUT)
132129
.assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type()))
133-
.then(() -> cluster.remove(1).shutdown().block())
130+
.then(() -> cluster.remove(1).shutdown().block(TIMEOUT))
134131
.assertNext(event -> assertEquals(ENDPOINT_LEAVING, event.type()))
135-
.thenAwait(TIMEOUT)
136132
.assertNext(event -> assertEquals(ENDPOINT_REMOVED, event.type()))
137133
.thenCancel()
138134
.verify(TIMEOUT);
@@ -147,9 +143,8 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
147143
.thenCancel()
148144
.verify(TIMEOUT);
149145

150-
Mono.when(cluster.stream().map(Microservices::shutdown).toArray(Mono[]::new))
151-
.then(Mono.delay(TIMEOUT))
152-
.block();
146+
Mono.whenDelayError(cluster.stream().map(Microservices::shutdown).toArray(Mono[]::new))
147+
.block(TIMEOUT);
153148
}
154149

155150
@ParameterizedTest
@@ -203,9 +198,8 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
203198
.thenCancel()
204199
.verify(TIMEOUT);
205200

206-
Mono.when(cluster.stream().map(Microservices::shutdown).toArray(Mono[]::new))
207-
.then(Mono.delay(TIMEOUT))
208-
.block();
201+
Mono.whenDelayError(cluster.stream().map(Microservices::shutdown).toArray(Mono[]::new))
202+
.block(TIMEOUT);
209203
}
210204

211205
private Function<ServiceEndpoint, ServiceDiscovery> defServiceDiscovery(

0 commit comments

Comments
 (0)