3333import org .slf4j .Logger ;
3434import org .slf4j .LoggerFactory ;
3535import reactor .core .Exceptions ;
36- import reactor .core .publisher .DirectProcessor ;
3736import reactor .core .publisher .Flux ;
38- import reactor .core .publisher .FluxSink ;
3937import reactor .core .publisher .Mono ;
40- import reactor .core .publisher .Operators ;
4138import reactor .core .publisher .SignalType ;
39+ import reactor .core .publisher .Sinks ;
4240import reactor .core .publisher .Sinks .EmitFailureHandler ;
4341import reactor .core .publisher .Sinks .EmitResult ;
4442
4543public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
4644
4745 private static final Logger LOGGER = LoggerFactory .getLogger (ServiceDiscovery .class );
4846
49- static {
50- Operators .enableOnDiscard (null , o -> LOGGER .warn ("[onDiscard] element = {}" , o ));
51- }
52-
5347 private final ServiceEndpoint serviceEndpoint ;
5448
5549 private ClusterConfig clusterConfig ;
5650 private Cluster cluster ;
5751
5852 // Sink
59- private final DirectProcessor <ServiceDiscoveryEvent > subject = DirectProcessor . create ();
60- private final FluxSink < ServiceDiscoveryEvent > sink = subject . sink ();
53+ private final Sinks . Many <ServiceDiscoveryEvent > sink =
54+ Sinks . many (). multicast (). directBestEffort ();
6155
6256 /**
6357 * Constructor.
@@ -171,19 +165,21 @@ public void onMembershipEvent(MembershipEvent event) {
171165
172166 @ Override
173167 public Flux <ServiceDiscoveryEvent > listen () {
174- return subject .onBackpressureBuffer ();
168+ return sink . asFlux () .onBackpressureBuffer ();
175169 }
176170
177171 @ Override
178172 public Mono <Void > shutdown () {
179173 return Mono .defer (
180174 () -> {
181175 if (cluster == null ) {
182- sink .complete ( );
176+ sink .emitComplete ( RetryEmitFailureHandler . INSTANCE );
183177 return Mono .empty ();
184178 }
185179 cluster .shutdown ();
186- return cluster .onShutdown ().doFinally (s -> sink .complete ());
180+ return cluster
181+ .onShutdown ()
182+ .doFinally (s -> sink .emitComplete (RetryEmitFailureHandler .INSTANCE ));
187183 });
188184 }
189185
@@ -200,7 +196,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
200196
201197 if (discoveryEvent != null ) {
202198 LOGGER .debug ("Publish discoveryEvent: {}" , discoveryEvent );
203- sink .next (discoveryEvent );
199+ sink .emitNext (discoveryEvent , RetryEmitFailureHandler . INSTANCE );
204200 }
205201 }
206202
@@ -300,7 +296,6 @@ private static class RetryEmitFailureHandler implements EmitFailureHandler {
300296
301297 @ Override
302298 public boolean onEmitFailure (SignalType signalType , EmitResult emitResult ) {
303- LOGGER .warn ("[onEmitFailure] signalType={}, emitResult={}" , signalType , emitResult );
304299 return emitResult == FAIL_NON_SERIALIZED ;
305300 }
306301 }
0 commit comments