Skip to content

Commit 45ff8dc

Browse files
committed
Set back directProcessor in CompositeDiscovery, ScaldecubeServiceDsicovery
1 parent 8a62554 commit 45ff8dc

File tree

1 file changed

+8
-9
lines changed

1 file changed

+8
-9
lines changed

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535
import reactor.core.Exceptions;
36+
import reactor.core.publisher.DirectProcessor;
3637
import reactor.core.publisher.Flux;
38+
import reactor.core.publisher.FluxSink;
3739
import reactor.core.publisher.Mono;
3840
import reactor.core.publisher.Operators;
3941
import reactor.core.publisher.SignalType;
40-
import reactor.core.publisher.Sinks;
4142
import reactor.core.publisher.Sinks.EmitFailureHandler;
4243
import reactor.core.publisher.Sinks.EmitResult;
4344

@@ -55,8 +56,8 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
5556
private Cluster cluster;
5657

5758
// Sink
58-
private final Sinks.Many<ServiceDiscoveryEvent> sink =
59-
Sinks.many().multicast().directBestEffort();
59+
private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
60+
private final FluxSink<ServiceDiscoveryEvent> sink = subject.sink();
6061

6162
/**
6263
* Constructor.
@@ -170,21 +171,19 @@ public void onMembershipEvent(MembershipEvent event) {
170171

171172
@Override
172173
public Flux<ServiceDiscoveryEvent> listen() {
173-
return sink.asFlux().onBackpressureBuffer();
174+
return subject.onBackpressureBuffer();
174175
}
175176

176177
@Override
177178
public Mono<Void> shutdown() {
178179
return Mono.defer(
179180
() -> {
180181
if (cluster == null) {
181-
sink.emitComplete(RetryEmitFailureHandler.INSTANCE);
182+
sink.complete();
182183
return Mono.empty();
183184
}
184185
cluster.shutdown();
185-
return cluster
186-
.onShutdown()
187-
.doFinally(s -> sink.emitComplete(RetryEmitFailureHandler.INSTANCE));
186+
return cluster.onShutdown().doFinally(s -> sink.complete());
188187
});
189188
}
190189

@@ -201,7 +200,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
201200

202201
if (discoveryEvent != null) {
203202
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
204-
sink.emitNext(discoveryEvent, RetryEmitFailureHandler.INSTANCE);
203+
sink.next(discoveryEvent);
205204
}
206205
}
207206

0 commit comments

Comments
 (0)