File tree Expand file tree Collapse file tree 4 files changed +11
-18
lines changed
main/java/io/scalecube/services
test/java/io/scalecube/services Expand file tree Collapse file tree 4 files changed +11
-18
lines changed Original file line number Diff line number Diff line change @@ -302,16 +302,12 @@ public ServiceDiscoveryContext discovery(String id) {
302302 }
303303
304304 /**
305- * Returns composite service discovery context .
305+ * Function to subscribe and listen on {@code ServiceDiscoveryEvent} events .
306306 *
307- * @return composite service discovery context
307+ * @return stream of {@code ServiceDiscoveryEvent} events
308308 */
309- public ServiceDiscoveryContext discovery () {
310- return ServiceDiscoveryContext .builder ()
311- .id ("composite-discovery" )
312- .address (Address .NULL_ADDRESS )
313- .discovery (compositeDiscovery )
314- .build ();
309+ public Flux <ServiceDiscoveryEvent > listenDiscovery () {
310+ return compositeDiscovery .listen ();
315311 }
316312
317313 /**
Original file line number Diff line number Diff line change @@ -50,7 +50,7 @@ public void test_added_removed_registration_events(MetadataCodec metadataCodec)
5050 .transport (RSocketServiceTransport ::new )
5151 .startAwait ();
5252
53- seed .discovery (). listen ().subscribe (events );
53+ seed .listenDiscovery ().subscribe (events );
5454
5555 Address seedAddress = seed .discovery ("seed" ).address ();
5656
@@ -96,7 +96,7 @@ public void test_listen_to_discovery_events(MetadataCodec metadataCodec) {
9696 .startAwait ();
9797 cluster .add (seed );
9898
99- seed .discovery (). listen ().subscribe (processor );
99+ seed .listenDiscovery ().subscribe (processor );
100100
101101 Address seedAddress = seed .discovery ("seed" ).address ();
102102
@@ -161,7 +161,7 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec)
161161 .startAwait ();
162162 cluster .add (seed );
163163
164- seed .discovery (). listen ().subscribe (processor );
164+ seed .listenDiscovery ().subscribe (processor );
165165
166166 Address seedAddress = seed .discovery ("seed" ).address ();
167167
Original file line number Diff line number Diff line change @@ -13,7 +13,7 @@ public class AnnotationServiceImpl implements AnnotationService {
1313 @ AfterConstruct
1414 void init (Microservices microservices ) {
1515 this .serviceDiscoveryEvents = ReplayProcessor .create ();
16- microservices .discovery (). listen ().subscribe (serviceDiscoveryEvents );
16+ microservices .listenDiscovery ().subscribe (serviceDiscoveryEvents );
1717 }
1818
1919 @ Override
Original file line number Diff line number Diff line change @@ -82,8 +82,7 @@ public void test_remote_node_died_mono_never() throws Exception {
8282 sub1 .set (serviceCall .requestOne (JUST_NEVER ).doOnError (exceptionHolder ::set ).subscribe ());
8383
8484 gateway
85- .discovery ()
86- .listen ()
85+ .listenDiscovery ()
8786 .filter (ServiceDiscoveryEvent ::isEndpointRemoved )
8887 .subscribe (onNext -> latch1 .countDown (), System .err ::println );
8988
@@ -111,8 +110,7 @@ public void test_remote_node_died_many_never() throws Exception {
111110 sub1 .set (serviceCall .requestMany (JUST_MANY_NEVER ).doOnError (exceptionHolder ::set ).subscribe ());
112111
113112 gateway
114- .discovery ()
115- .listen ()
113+ .listenDiscovery ()
116114 .filter (ServiceDiscoveryEvent ::isEndpointRemoved )
117115 .subscribe (onNext -> latch1 .countDown (), System .err ::println );
118116
@@ -144,8 +142,7 @@ public void test_remote_node_died_many_then_never() throws Exception {
144142 .subscribe ());
145143
146144 gateway
147- .discovery ()
148- .listen ()
145+ .listenDiscovery ()
149146 .filter (ServiceDiscoveryEvent ::isEndpointRemoved )
150147 .subscribe (onNext -> latch1 .countDown (), System .err ::println );
151148
You can’t perform that action at this time.
0 commit comments