Skip to content

Commit 06fa7e5

Browse files
authored
Merge pull request #813 from scalecube/update-dependencies-and-add-failurehandler
Fixed Sinks, added RetryEmitFailureHandler
2 parents 07888a9 + 233b6ce commit 06fa7e5

File tree

3 files changed

+41
-6
lines changed

3 files changed

+41
-6
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
</scm>
5858

5959
<properties>
60-
<scalecube-cluster.version>2.6.7</scalecube-cluster.version>
60+
<scalecube-cluster.version>2.6.8</scalecube-cluster.version>
6161
<scalecube-commons.version>1.0.13</scalecube-commons.version>
6262
<scalecube-security.version>1.0.19</scalecube-security.version>
6363

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded;
44
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving;
55
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved;
6+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
67

78
import io.scalecube.cluster.Cluster;
89
import io.scalecube.cluster.ClusterConfig;
@@ -34,7 +35,10 @@
3435
import reactor.core.Exceptions;
3536
import reactor.core.publisher.Flux;
3637
import reactor.core.publisher.Mono;
38+
import reactor.core.publisher.SignalType;
3739
import reactor.core.publisher.Sinks;
40+
import reactor.core.publisher.Sinks.EmitFailureHandler;
41+
import reactor.core.publisher.Sinks.EmitResult;
3842

3943
public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
4044

@@ -169,11 +173,13 @@ public Mono<Void> shutdown() {
169173
return Mono.defer(
170174
() -> {
171175
if (cluster == null) {
172-
sink.tryEmitComplete();
176+
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
173177
return Mono.empty();
174178
}
175179
cluster.shutdown();
176-
return cluster.onShutdown().doFinally(s -> sink.tryEmitComplete());
180+
return cluster
181+
.onShutdown()
182+
.doFinally(s -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE));
177183
});
178184
}
179185

@@ -190,7 +196,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
190196

191197
if (discoveryEvent != null) {
192198
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
193-
sink.tryEmitNext(discoveryEvent);
199+
sink.emitNext(discoveryEvent, RetryEmitFailureHandler.INSTANCE);
194200
}
195201
}
196202

@@ -283,4 +289,14 @@ private void onDiscoveryEvent(ServiceDiscoveryEvent event) {
283289
}
284290
}
285291
}
292+
293+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
294+
295+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
296+
297+
@Override
298+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
299+
return emitResult == FAIL_NON_SERIALIZED;
300+
}
301+
}
286302
}

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.services;
22

3+
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
4+
35
import io.scalecube.net.Address;
46
import io.scalecube.services.api.ServiceMessage;
57
import io.scalecube.services.auth.Authenticator;
@@ -57,7 +59,10 @@
5759
import reactor.core.Exceptions;
5860
import reactor.core.publisher.Flux;
5961
import reactor.core.publisher.Mono;
62+
import reactor.core.publisher.SignalType;
6063
import reactor.core.publisher.Sinks;
64+
import reactor.core.publisher.Sinks.EmitFailureHandler;
65+
import reactor.core.publisher.Sinks.EmitResult;
6166
import reactor.core.scheduler.Scheduler;
6267
import reactor.core.scheduler.Schedulers;
6368

@@ -167,7 +172,7 @@ private Microservices(Builder builder) {
167172
shutdown
168173
.asMono()
169174
.then(doShutdown())
170-
.doFinally(s -> onShutdown.tryEmitEmpty())
175+
.doFinally(s -> onShutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE))
171176
.subscribe(
172177
null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString()));
173178
}
@@ -344,7 +349,11 @@ public Flux<ServiceDiscoveryEvent> listenDiscovery() {
344349
* @return result of shutdown
345350
*/
346351
public Mono<Void> shutdown() {
347-
return Mono.fromRunnable(shutdown::tryEmitEmpty).then(onShutdown.asMono());
352+
return Mono.defer(
353+
() -> {
354+
shutdown.emitEmpty(RetryEmitFailureHandler.INSTANCE);
355+
return onShutdown.asMono();
356+
});
348357
}
349358

350359
/**
@@ -900,4 +909,14 @@ private static String asString(ServiceInfo serviceInfo) {
900909
.toString();
901910
}
902911
}
912+
913+
private static class RetryEmitFailureHandler implements EmitFailureHandler {
914+
915+
private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();
916+
917+
@Override
918+
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
919+
return emitResult == FAIL_NON_SERIALIZED;
920+
}
921+
}
903922
}

0 commit comments

Comments
 (0)