Skip to content

Commit c2a0a7c

Browse files
committed
Added Closeable
1 parent aeb2ad4 commit c2a0a7c

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public Mono<Void> start() {
129129
.config(options -> clusterConfig)
130130
.handler(
131131
cluster -> {
132+
//noinspection CodeBlock2Expr
132133
return new ClusterMessageHandler() {
133134
@Override
134135
public void onMembershipEvent(MembershipEvent event) {
@@ -176,10 +177,8 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
176177
return;
177178
}
178179

179-
if (discoveryEvent != null) {
180-
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
181-
sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED);
182-
}
180+
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
181+
sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED);
183182
}
184183

185184
private ServiceDiscoveryEvent toServiceDiscoveryEvent(MembershipEvent membershipEvent) {

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
*
127127
* }</pre>
128128
*/
129-
public final class Microservices {
129+
public final class Microservices implements AutoCloseable {
130130

131131
public static final Logger LOGGER = LoggerFactory.getLogger(Microservices.class);
132132

@@ -190,12 +190,8 @@ public String toString() {
190190
private Mono<Microservices> start() {
191191
LOGGER.info("[{}][start] Starting", id);
192192

193-
// Create bootstrap scheduler
194-
Scheduler scheduler = Schedulers.newSingle(toString(), true);
195-
196193
return transportBootstrap
197194
.start(this)
198-
.publishOn(scheduler)
199195
.flatMap(
200196
transportBootstrap -> {
201197
final ServiceCall call = call();
@@ -229,19 +225,15 @@ private Mono<Microservices> start() {
229225

230226
return createDiscovery(
231227
this, new ServiceDiscoveryOptions().serviceEndpoint(serviceEndpoint))
232-
.publishOn(scheduler)
233228
.then(startGateway(new GatewayOptions().call(call)))
234-
.publishOn(scheduler)
235229
.then(Mono.fromCallable(() -> Injector.inject(this, serviceInstances)))
236230
.then(Mono.fromCallable(() -> JmxMonitorMBean.start(this)))
237231
.then(compositeDiscovery.startListen())
238-
.publishOn(scheduler)
239232
.thenReturn(this);
240233
})
241234
.onErrorResume(
242235
ex -> Mono.defer(this::shutdown).then(Mono.error(ex)).cast(Microservices.class))
243-
.doOnSuccess(m -> LOGGER.info("[{}][start] Started", id))
244-
.doOnTerminate(scheduler::dispose);
236+
.doOnSuccess(m -> LOGGER.info("[{}][start] Started", id));
245237
}
246238

247239
private ServiceEndpoint newServiceEndpoint(ServiceEndpoint serviceEndpoint) {
@@ -312,6 +304,18 @@ public List<ServiceEndpoint> serviceEndpoints() {
312304
return serviceRegistry.listServiceEndpoints();
313305
}
314306

307+
public Map<String, String> tags() {
308+
return tags;
309+
}
310+
311+
public ServiceRegistry serviceRegistry() {
312+
return serviceRegistry;
313+
}
314+
315+
public ServiceMethodRegistry methodRegistry() {
316+
return methodRegistry;
317+
}
318+
315319
/**
316320
* Returns default service discovery context.
317321
*
@@ -393,6 +397,15 @@ private Mono<Void> processBeforeDestroy() {
393397
.collect(Collectors.toList()));
394398
}
395399

400+
@Override
401+
public void close() {
402+
try {
403+
shutdown().toFuture().get();
404+
} catch (Exception e) {
405+
throw new RuntimeException(e);
406+
}
407+
}
408+
396409
public static final class Builder {
397410

398411
private Map<String, String> tags = new HashMap<>();

0 commit comments

Comments
 (0)