Skip to content

Commit d77b5be

Browse files
committed
WIP ..
1 parent 3155e05 commit d77b5be

File tree

1 file changed

+64
-50
lines changed

1 file changed

+64
-50
lines changed

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.scalecube.net.Address;
55
import io.scalecube.services.auth.Authenticator;
66
import io.scalecube.services.discovery.api.ServiceDiscovery;
7+
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
78
import io.scalecube.services.exceptions.DefaultErrorMapper;
89
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
910
import io.scalecube.services.gateway.Gateway;
@@ -176,14 +177,14 @@ private Mono<Microservices> start() {
176177
Scheduler scheduler = Schedulers.newSingle(toString(), true);
177178

178179
return transportBootstrap
179-
.start(this, methodRegistry)
180+
.start(this)
180181
.publishOn(scheduler)
181182
.flatMap(
182183
input -> {
183184
final ServiceCall call = call();
184185
final Address serviceAddress;
185186

186-
if (input != ServiceTransportBootstrap.noOpInstance) {
187+
if (input != ServiceTransportBootstrap.NULL_INSTANCE) {
187188
serviceAddress = input.address;
188189
} else {
189190
serviceAddress = null;
@@ -208,13 +209,14 @@ private Mono<Microservices> start() {
208209
.map(ServiceInfo::serviceInstance)
209210
.collect(Collectors.toList());
210211

211-
return discoveryBootstrap
212-
.listenDiscovery(serviceEndpointBuilder.build(), serviceRegistry)
212+
final ServiceEndpoint serviceEndpoint = serviceEndpointBuilder.build();
213+
214+
return startGateway(call)
213215
.publishOn(scheduler)
214-
.then(Mono.defer(() -> startGateway(call)).publishOn(scheduler))
215216
.then(Mono.fromCallable(() -> Injector.inject(this, serviceInstances)))
216217
.then(Mono.fromCallable(() -> JmxMonitorMBean.start(this)))
217-
.then(Mono.defer(discoveryBootstrap::createAndStartListen).publishOn(scheduler))
218+
.then(discoveryBootstrap.startListen(this, serviceEndpoint))
219+
.publishOn(scheduler)
218220
.thenReturn(this);
219221
})
220222
.onErrorResume(
@@ -424,43 +426,63 @@ public Builder defaultDataDecoder(ServiceMessageDataDecoder dataDecoder) {
424426

425427
public static class ServiceDiscoveryBootstrap {
426428

427-
private Function<ServiceEndpoint, ServiceDiscovery> discoveryFactory =
428-
ignore -> NoOpServiceDiscovery.INSTANCE;
429+
private final Function<ServiceEndpoint, ServiceDiscovery> factory;
429430

430431
private ServiceDiscovery discovery;
431432
private Disposable disposable;
432433

433-
private ServiceDiscoveryBootstrap() {}
434+
private ServiceDiscoveryBootstrap() {
435+
this(i -> NoOpServiceDiscovery.INSTANCE);
436+
}
434437

435438
private ServiceDiscoveryBootstrap(Function<ServiceEndpoint, ServiceDiscovery> factory) {
436-
this.discoveryFactory = factory;
439+
this.factory = factory;
437440
}
438441

439-
private Mono<ServiceDiscovery> createAndStartListen(
440-
ServiceEndpoint serviceEndpoint, ServiceRegistry serviceRegistry) {
442+
private Mono<ServiceDiscovery> startListen(
443+
Microservices microservices, ServiceEndpoint serviceEndpoint) {
441444
return Mono.defer(
442445
() -> {
443-
discovery = discoveryFactory.apply(serviceEndpoint);
444-
446+
discovery = factory.apply(serviceEndpoint);
445447
disposable =
446448
discovery
447449
.listenDiscovery()
448-
.subscribe(
449-
event -> {
450-
if (event.isEndpointAdded()) {
451-
serviceRegistry.registerService(event.serviceEndpoint());
452-
}
453-
if (event.isEndpointLeaving() || event.isEndpointRemoved()) {
454-
serviceRegistry.unregisterService(event.serviceEndpoint().id());
455-
}
456-
});
457-
458-
459-
460-
return discovery.start().doOnSuccess(serviceDiscovery -> discovery = serviceDiscovery);
450+
.subscribe(event -> onDiscoveryEvent(microservices, event));
451+
return discovery
452+
.start()
453+
.doOnSuccess(discovery -> this.discovery = discovery)
454+
.doOnSubscribe(
455+
s ->
456+
LOGGER.info(
457+
"[{}][{}] Starting",
458+
microservices.id(),
459+
discovery.getClass().getSimpleName()))
460+
.doOnSuccess(
461+
discovery ->
462+
LOGGER.info(
463+
"[{}][{}][{}] Started",
464+
microservices.id(),
465+
discovery.getClass().getSimpleName(),
466+
discovery.address()))
467+
.doOnError(
468+
ex ->
469+
LOGGER.error(
470+
"[{}][{}] Failed to start, cause: {}",
471+
microservices.id(),
472+
discovery.getClass().getSimpleName(),
473+
ex.toString()));
461474
});
462475
}
463476

477+
private void onDiscoveryEvent(Microservices microservices, ServiceDiscoveryEvent event) {
478+
if (event.isEndpointAdded()) {
479+
microservices.serviceRegistry.registerService(event.serviceEndpoint());
480+
}
481+
if (event.isEndpointLeaving() || event.isEndpointRemoved()) {
482+
microservices.serviceRegistry.unregisterService(event.serviceEndpoint().id());
483+
}
484+
}
485+
464486
private Mono<Void> shutdown() {
465487
return Mono.defer(
466488
() -> {
@@ -498,13 +520,13 @@ private Mono<GatewayBootstrap> start(Microservices microservices, GatewayOptions
498520
gateway.getClass().getSimpleName(),
499521
gateway.id()))
500522
.doOnSuccess(
501-
result ->
523+
gateway1 ->
502524
LOGGER.info(
503525
"[{}][{}][{}][{}] Started",
504526
microservices.id(),
505-
result.getClass().getSimpleName(),
506-
result.id(),
507-
result.address()))
527+
gateway1.getClass().getSimpleName(),
528+
gateway1.id(),
529+
gateway1.address()))
508530
.doOnError(
509531
ex ->
510532
LOGGER.error(
@@ -527,16 +549,16 @@ private List<Gateway> gateways() {
527549

528550
private Gateway gateway(String id) {
529551
return gateways.stream()
530-
.filter(gw -> gw.id().equals(id))
552+
.filter(gateway -> gateway.id().equals(id))
531553
.findFirst()
532-
.orElseThrow(
533-
() -> new IllegalArgumentException("Didn't find gateway by id: '" + id + "'"));
554+
.orElseThrow(() -> new IllegalArgumentException("Didn't find gateway by id=" + id));
534555
}
535556
}
536557

537558
public static class ServiceTransportBootstrap {
538559

539-
public static final ServiceTransportBootstrap noOpInstance = new ServiceTransportBootstrap();
560+
public static final Supplier<ServiceTransport> NULL_SUPPLIER = () -> null;
561+
public static final ServiceTransportBootstrap NULL_INSTANCE = new ServiceTransportBootstrap();
540562

541563
private final Supplier<ServiceTransport> supplier;
542564

@@ -546,26 +568,23 @@ public static class ServiceTransportBootstrap {
546568
private Address address;
547569

548570
public ServiceTransportBootstrap() {
549-
this(null);
571+
this(NULL_SUPPLIER);
550572
}
551573

552574
public ServiceTransportBootstrap(Supplier<ServiceTransport> supplier) {
553575
this.supplier = supplier;
554576
}
555577

556-
private Mono<ServiceTransportBootstrap> start(
557-
Microservices microservices, ServiceMethodRegistry methodRegistry) {
558-
559-
if (supplier == null) {
560-
return Mono.just(ServiceTransportBootstrap.noOpInstance);
578+
private Mono<ServiceTransportBootstrap> start(Microservices microservices) {
579+
if (supplier == NULL_SUPPLIER) {
580+
return Mono.just(NULL_INSTANCE);
561581
}
562-
563582
serviceTransport = supplier.get();
564-
565583
return serviceTransport
566584
.start()
567585
.doOnSuccess(transport -> serviceTransport = transport)
568-
.flatMap(transport -> serviceTransport.serverTransport().bind(methodRegistry))
586+
.flatMap(
587+
transport -> serviceTransport.serverTransport().bind(microservices.methodRegistry))
569588
.doOnSuccess(transport -> serverTransport = transport)
570589
.map(
571590
transport -> {
@@ -576,12 +595,7 @@ private Mono<ServiceTransportBootstrap> start(
576595
this.clientTransport = serviceTransport.clientTransport();
577596
return this;
578597
})
579-
.doOnSubscribe(
580-
s ->
581-
LOGGER.info(
582-
"[{}][{}] Starting",
583-
microservices.id(),
584-
serviceTransport.getClass().getSimpleName()))
598+
.doOnSubscribe(s -> LOGGER.info("[{}][ServiceTransport] Starting", microservices.id()))
585599
.doOnSuccess(
586600
transport ->
587601
LOGGER.info(

0 commit comments

Comments
 (0)