Skip to content

Commit 3155e05

Browse files
committed
WIP
1 parent 4ec4dfc commit 3155e05

File tree

1 file changed

+20
-67
lines changed

1 file changed

+20
-67
lines changed

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 20 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,12 @@ private Mono<Microservices> start() {
209209
.collect(Collectors.toList());
210210

211211
return discoveryBootstrap
212-
.create(serviceEndpointBuilder.build(), serviceRegistry)
212+
.listenDiscovery(serviceEndpointBuilder.build(), serviceRegistry)
213213
.publishOn(scheduler)
214214
.then(Mono.defer(() -> startGateway(call)).publishOn(scheduler))
215215
.then(Mono.fromCallable(() -> Injector.inject(this, serviceInstances)))
216216
.then(Mono.fromCallable(() -> JmxMonitorMBean.start(this)))
217-
.then(Mono.defer(discoveryBootstrap::start).publishOn(scheduler))
217+
.then(Mono.defer(discoveryBootstrap::createAndStartListen).publishOn(scheduler))
218218
.thenReturn(this);
219219
})
220220
.onErrorResume(
@@ -436,79 +436,40 @@ private ServiceDiscoveryBootstrap(Function<ServiceEndpoint, ServiceDiscovery> fa
436436
this.discoveryFactory = factory;
437437
}
438438

439-
/**
440-
* Creates instance of {@code ServiceDiscovery}.
441-
*
442-
* @param serviceEndpoint local service endpoint
443-
* @param serviceRegistry service registry
444-
* @return new {@code ServiceDiscovery} instance
445-
*/
446-
private Mono<ServiceDiscovery> create(
439+
private Mono<ServiceDiscovery> createAndStartListen(
447440
ServiceEndpoint serviceEndpoint, ServiceRegistry serviceRegistry) {
448441
return Mono.defer(
449442
() -> {
450443
discovery = discoveryFactory.apply(serviceEndpoint);
444+
451445
disposable =
452446
discovery
453447
.listenDiscovery()
454448
.subscribe(
455-
discoveryEvent -> {
456-
if (discoveryEvent.isEndpointAdded()) {
457-
serviceRegistry.registerService(discoveryEvent.serviceEndpoint());
449+
event -> {
450+
if (event.isEndpointAdded()) {
451+
serviceRegistry.registerService(event.serviceEndpoint());
458452
}
459-
if (discoveryEvent.isEndpointLeaving()
460-
|| discoveryEvent.isEndpointRemoved()) {
461-
serviceRegistry.unregisterService(
462-
discoveryEvent.serviceEndpoint().id());
453+
if (event.isEndpointLeaving() || event.isEndpointRemoved()) {
454+
serviceRegistry.unregisterService(event.serviceEndpoint().id());
463455
}
464456
});
465-
return Mono.just(discovery);
457+
458+
459+
460+
return discovery.start().doOnSuccess(serviceDiscovery -> discovery = serviceDiscovery);
466461
});
467462
}
468463

469-
/**
470-
* Starts {@code ServiceDiscovery} instance.
471-
*
472-
* @return started {@code ServiceDiscovery} instance
473-
*/
474-
private Mono<ServiceDiscovery> start() {
464+
private Mono<Void> shutdown() {
475465
return Mono.defer(
476466
() -> {
477-
if (discovery == null) {
478-
throw new IllegalStateException(
479-
"Create ServiceDiscovery instance before starting it");
467+
if (disposable != null) {
468+
disposable.dispose();
480469
}
481-
LOGGER.debug("Starting ServiceDiscovery");
482-
return discovery
483-
.start()
484-
.doOnSuccess(
485-
serviceDiscovery -> {
486-
discovery = serviceDiscovery;
487-
LOGGER.debug("Successfully started ServiceDiscovery -- {}", discovery);
488-
})
489-
.doOnError(
490-
ex ->
491-
LOGGER.error(
492-
"Failed to start ServiceDiscovery -- {}, cause: ", discovery, ex));
470+
return discovery != null ? discovery.shutdown() : Mono.empty();
493471
});
494472
}
495-
496-
private Mono<Void> shutdown() {
497-
return Mono.defer(
498-
() ->
499-
Optional.ofNullable(discovery) //
500-
.map(ServiceDiscovery::shutdown)
501-
.orElse(Mono.empty())
502-
.doFinally(
503-
s -> {
504-
if (disposable != null) {
505-
disposable.dispose();
506-
}
507-
if (discovery != null) {
508-
LOGGER.debug("ServiceDiscovery -- {} has been stopped", discovery);
509-
}
510-
}));
511-
}
512473
}
513474

514475
private static class GatewayBootstrap {
@@ -547,7 +508,7 @@ private Mono<GatewayBootstrap> start(Microservices microservices, GatewayOptions
547508
.doOnError(
548509
ex ->
549510
LOGGER.error(
550-
"[{}][{}][{}] Failed to start: {}",
511+
"[{}][{}][{}] Failed to start, cause: {}",
551512
microservices.id(),
552513
gateway.getClass().getSimpleName(),
553514
gateway.id(),
@@ -557,15 +518,7 @@ private Mono<GatewayBootstrap> start(Microservices microservices, GatewayOptions
557518
}
558519

559520
private Mono<Void> shutdown() {
560-
return Mono.defer(
561-
() ->
562-
Mono.whenDelayError(gateways.stream().map(Gateway::stop).toArray(Mono[]::new))
563-
.doFinally(
564-
s -> {
565-
if (!gateways.isEmpty()) {
566-
LOGGER.debug("Gateways have been stopped");
567-
}
568-
}));
521+
return Mono.whenDelayError(gateways.stream().map(Gateway::stop).toArray(Mono[]::new));
569522
}
570523

571524
private List<Gateway> gateways() {
@@ -639,7 +592,7 @@ private Mono<ServiceTransportBootstrap> start(
639592
.doOnError(
640593
ex ->
641594
LOGGER.error(
642-
"[{}][{}] Failed to start: {}",
595+
"[{}][{}] Failed to start, cause: {}",
643596
microservices.id(),
644597
serviceTransport.getClass().getSimpleName(),
645598
ex.toString()));

0 commit comments

Comments
 (0)