Skip to content

Commit dd30d65

Browse files
authored
Merge pull request #772 from scalecube/expose-service-registry
Fixed discovery contexts
2 parents e6e0e8c + b929b83 commit dd30d65

File tree

5 files changed

+180
-60
lines changed

5 files changed

+180
-60
lines changed

services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
public interface ServiceDiscovery {
77

88
/**
9-
* Function to subscribe and listen on {@code ServiceDiscoveryEvent} events.
9+
* Function to subscribe and listen on stream of {@code ServiceDiscoveryEvent}\s.
1010
*
11-
* @return stream of {@code ServiceDiscoveryEvent} events
11+
* @return stream of {@code ServiceDiscoveryEvent}\s
1212
*/
1313
Flux<ServiceDiscoveryEvent> listen();
1414

services-api/src/main/java/io/scalecube/services/discovery/api/ServiceDiscoveryContext.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,84 @@
11
package io.scalecube.services.discovery.api;
22

33
import io.scalecube.net.Address;
4+
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent.Type;
5+
import io.scalecube.services.registry.api.ServiceRegistry;
46
import java.util.Objects;
57
import java.util.StringJoiner;
68
import reactor.core.publisher.Flux;
9+
import reactor.core.scheduler.Scheduler;
710

811
public final class ServiceDiscoveryContext {
912

1013
private final String id;
1114
private final Address address;
1215
private final ServiceDiscovery discovery;
16+
private final ServiceRegistry serviceRegistry;
17+
private final Scheduler scheduler;
1318

1419
private ServiceDiscoveryContext(Builder builder) {
15-
this.id = Objects.requireNonNull(builder.id, "discoveryContext.id");
16-
this.address = Objects.requireNonNull(builder.address, "discoveryContext.address");
17-
this.discovery = Objects.requireNonNull(builder.discovery, "discoveryContext.discovery");
20+
this.id = Objects.requireNonNull(builder.id, "id");
21+
this.address = Objects.requireNonNull(builder.address, "address");
22+
this.discovery = Objects.requireNonNull(builder.discovery, "discovery");
23+
this.serviceRegistry = Objects.requireNonNull(builder.serviceRegistry, "serviceRegistry");
24+
this.scheduler = Objects.requireNonNull(builder.scheduler, "scheduler");
1825
}
1926

2027
public static Builder builder() {
2128
return new Builder();
2229
}
2330

31+
/**
32+
* Returns copy builder.
33+
*
34+
* @param other instance to copy from
35+
* @return new builder instance
36+
*/
37+
public static Builder from(ServiceDiscoveryContext other) {
38+
return new Builder()
39+
.id(other.id)
40+
.address(other.address)
41+
.discovery(other.discovery)
42+
.serviceRegistry(other.serviceRegistry)
43+
.scheduler(other.scheduler);
44+
}
45+
46+
/**
47+
* Returns service discovery id.
48+
*
49+
* @return service discovery id
50+
*/
2451
public String id() {
2552
return id;
2653
}
2754

55+
/**
56+
* Returns service discovery address. If service discovery instance is not yet started, then this
57+
* method will return {@link Address#NULL_ADDRESS}.
58+
*
59+
* @see ServiceDiscovery#start()
60+
* @see ServiceDiscovery#listen()
61+
* @return service discovery address
62+
*/
2863
public Address address() {
2964
return address;
3065
}
3166

67+
/**
68+
* Returns stream of service discovery events. Can be called before or after {@link
69+
* ServiceDiscovery#start()}. If it's called before then new events will be streamed, if it's
70+
* called after then {@link ServiceRegistry#listServiceEndpoints()} will be turned to service
71+
* discovery events of type {@link Type#ENDPOINT_ADDED}, and concateneted with a stream of live
72+
* events.
73+
*
74+
* @return stream of service discovery events
75+
*/
3276
public Flux<ServiceDiscoveryEvent> listen() {
33-
return discovery.listen();
77+
return Flux.fromStream(serviceRegistry.listServiceEndpoints().stream())
78+
.map(ServiceDiscoveryEvent::newEndpointAdded)
79+
.concatWith(discovery.listen())
80+
.subscribeOn(scheduler)
81+
.publishOn(scheduler);
3482
}
3583

3684
@Override
@@ -39,6 +87,8 @@ public String toString() {
3987
.add("id='" + id + "'")
4088
.add("address=" + address)
4189
.add("discovery=" + discovery)
90+
.add("serviceRegistry=" + serviceRegistry)
91+
.add("scheduler=" + scheduler)
4292
.toString();
4393
}
4494

@@ -47,6 +97,8 @@ public static class Builder {
4797
private String id;
4898
private Address address;
4999
private ServiceDiscovery discovery;
100+
private ServiceRegistry serviceRegistry;
101+
private Scheduler scheduler;
50102

51103
private Builder() {}
52104

@@ -65,6 +117,16 @@ public Builder discovery(ServiceDiscovery discovery) {
65117
return this;
66118
}
67119

120+
public Builder serviceRegistry(ServiceRegistry serviceRegistry) {
121+
this.serviceRegistry = serviceRegistry;
122+
return this;
123+
}
124+
125+
public Builder scheduler(Scheduler scheduler) {
126+
this.scheduler = scheduler;
127+
return this;
128+
}
129+
68130
public ServiceDiscoveryContext build() {
69131
return new ServiceDiscoveryContext(this);
70132
}

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public ScalecubeServiceDiscovery(ServiceEndpoint serviceEndpoint) {
5959
this.clusterConfig =
6060
ClusterConfig.defaultLanConfig()
6161
.metadata(serviceEndpoint)
62-
.memberIdGenerator(serviceEndpoint::id)
6362
.transport(opts -> opts.transportFactory(new WebsocketTransportFactory()));
6463
}
6564

services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import io.scalecube.net.Address;
44
import io.scalecube.services.Microservices;
5+
import io.scalecube.services.annotations.AfterConstruct;
56
import io.scalecube.services.annotations.Service;
67
import io.scalecube.services.annotations.ServiceMethod;
78
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
9+
import io.scalecube.services.discovery.api.ServiceDiscoveryContext;
810
import io.scalecube.services.examples.helloworld.service.api.Greeting;
911
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
1012
import reactor.core.publisher.Mono;
@@ -16,16 +18,24 @@ public class CompositeDiscoveryExample {
1618
*
1719
* @param args arguments
1820
*/
19-
public static void main(String[] args) throws InterruptedException {
21+
public static void main(String[] args) {
2022
Microservices seed1 =
2123
Microservices.builder()
22-
.discovery("seed1", ScalecubeServiceDiscovery::new)
24+
.discovery(
25+
"seed1",
26+
serviceEndpoint ->
27+
new ScalecubeServiceDiscovery(serviceEndpoint)
28+
.options(opts -> opts.memberAlias("seed1")))
2329
.transport(RSocketServiceTransport::new)
2430
.startAwait();
2531

2632
Microservices seed2 =
2733
Microservices.builder()
28-
.discovery("seed2", ScalecubeServiceDiscovery::new)
34+
.discovery(
35+
"seed2",
36+
serviceEndpoint ->
37+
new ScalecubeServiceDiscovery(serviceEndpoint)
38+
.options(opts -> opts.memberAlias("seed2")))
2939
.transport(RSocketServiceTransport::new)
3040
.startAwait();
3141

@@ -38,6 +48,7 @@ public static void main(String[] args) throws InterruptedException {
3848
"ms1",
3949
endpoint ->
4050
new ScalecubeServiceDiscovery(endpoint)
51+
.options(opts -> opts.memberAlias("ms1"))
4152
.membership(cfg -> cfg.seedMembers(seed1Address)))
4253
.transport(RSocketServiceTransport::new)
4354
.services(new GreetingServiceImpl1())
@@ -49,6 +60,7 @@ public static void main(String[] args) throws InterruptedException {
4960
"ms2",
5061
endpoint ->
5162
new ScalecubeServiceDiscovery(endpoint)
63+
.options(opts -> opts.memberAlias("ms2"))
5264
.membership(cfg -> cfg.seedMembers(seed2Address)))
5365
.transport(RSocketServiceTransport::new)
5466
.services(new GreetingServiceImpl2())
@@ -60,13 +72,13 @@ public static void main(String[] args) throws InterruptedException {
6072
"domain1",
6173
endpoint ->
6274
new ScalecubeServiceDiscovery(endpoint)
63-
.options(cfg -> cfg.memberIdGenerator(endpoint::id))
75+
.options(opts -> opts.memberAlias("domain1"))
6476
.membership(cfg -> cfg.seedMembers(seed1Address)))
6577
.discovery(
6678
"domain2",
6779
endpoint ->
6880
new ScalecubeServiceDiscovery(endpoint)
69-
.options(cfg -> cfg.memberIdGenerator(endpoint::id))
81+
.options(opts -> opts.memberAlias("domain2"))
7082
.membership(cfg -> cfg.seedMembers(seed2Address)))
7183
.transport(RSocketServiceTransport::new)
7284
.startAwait();
@@ -78,8 +90,6 @@ public static void main(String[] args) throws InterruptedException {
7890
Greeting greeting2 =
7991
compositeMs.call().api(GreetingsService2.class).sayHello("hello two").block();
8092
System.err.println("This is response from GreetingsService2: " + greeting2.message());
81-
82-
Thread.currentThread().join();
8393
}
8494

8595
@Service
@@ -98,6 +108,16 @@ public interface GreetingsService2 {
98108

99109
public static class GreetingServiceImpl1 implements GreetingsService1 {
100110

111+
@AfterConstruct
112+
void init(Microservices ms) {
113+
ServiceDiscoveryContext discoveryContext = ms.discovery("ms1");
114+
System.err.println("discovery(\"ms1\"): " + discoveryContext);
115+
discoveryContext
116+
.listen()
117+
.subscribe(
118+
discoveryEvent -> System.err.println("discovery(\"ms1\") event: " + discoveryEvent));
119+
}
120+
101121
@Override
102122
public Mono<Greeting> sayHello(String name) {
103123
return Mono.just(
@@ -110,6 +130,16 @@ public Mono<Greeting> sayHello(String name) {
110130

111131
public static class GreetingServiceImpl2 implements GreetingsService2 {
112132

133+
@AfterConstruct
134+
void init(Microservices ms) {
135+
ServiceDiscoveryContext discoveryContext = ms.discovery("ms2");
136+
System.err.println("discovery(\"ms2\"): " + discoveryContext);
137+
discoveryContext
138+
.listen()
139+
.subscribe(
140+
discoveryEvent -> System.err.println("discovery(\"ms2\") event: " + discoveryEvent));
141+
}
142+
113143
@Override
114144
public Mono<Greeting> sayHello(String name) {
115145
return Mono.just(

0 commit comments

Comments
 (0)