11package io .scalecube .cluster ;
22
3+ import static reactor .core .publisher .Sinks .EmitResult .FAIL_NON_SERIALIZED ;
4+
35import io .scalecube .cluster .fdetector .FailureDetectorConfig ;
46import io .scalecube .cluster .fdetector .FailureDetectorImpl ;
57import io .scalecube .cluster .gossip .GossipConfig ;
4345import reactor .core .Disposable ;
4446import reactor .core .Disposables ;
4547import reactor .core .Exceptions ;
46- import reactor .core .publisher .DirectProcessor ;
4748import reactor .core .publisher .Flux ;
48- import reactor .core .publisher .FluxSink ;
4949import reactor .core .publisher .Mono ;
50- import reactor .core .publisher .MonoProcessor ;
50+ import reactor .core .publisher .SignalType ;
51+ import reactor .core .publisher .Sinks ;
52+ import reactor .core .publisher .Sinks .EmitFailureHandler ;
53+ import reactor .core .publisher .Sinks .EmitResult ;
5154import reactor .core .scheduler .Scheduler ;
5255import reactor .core .scheduler .Schedulers ;
5356
@@ -79,17 +82,17 @@ public final class ClusterImpl implements Cluster {
7982 cluster -> new ClusterMessageHandler () {};
8083
8184 // Subject
82- private final DirectProcessor <MembershipEvent > membershipEvents = DirectProcessor . create ();
83- private final FluxSink < MembershipEvent > membershipSink = membershipEvents . sink ();
85+ private final Sinks . Many <MembershipEvent > membershipSink =
86+ Sinks . many (). multicast (). directBestEffort ();
8487
8588 // Disposables
8689 private final Disposable .Composite actionsDisposables = Disposables .composite ();
8790
8891 // Lifecycle
89- private final MonoProcessor <Void > start = MonoProcessor . create ();
90- private final MonoProcessor <Void > onStart = MonoProcessor . create ();
91- private final MonoProcessor <Void > shutdown = MonoProcessor . create ();
92- private final MonoProcessor <Void > onShutdown = MonoProcessor . create ();
92+ private final Sinks . One <Void > start = Sinks . one ();
93+ private final Sinks . One <Void > onStart = Sinks . one ();
94+ private final Sinks . One <Void > shutdown = Sinks . one ();
95+ private final Sinks . One <Void > onShutdown = Sinks . one ();
9396
9497 // Cluster components
9598 private Transport transport ;
@@ -119,14 +122,16 @@ private ClusterImpl(ClusterImpl that) {
119122
120123 private void initLifecycle () {
121124 start
125+ .asMono ()
122126 .then (doStart ())
123- .doOnSuccess (avoid -> onStart .onComplete ( ))
124- .doOnError (onStart :: onError )
127+ .doOnSuccess (avoid -> onStart .emitEmpty ( RetryEmitFailureHandler . INSTANCE ))
128+ .doOnError (th -> onStart . emitError ( th , RetryEmitFailureHandler . INSTANCE ) )
125129 .subscribe (null , th -> LOGGER .error ("[{}][doStart] Exception occurred:" , localMember , th ));
126130
127131 shutdown
132+ .asMono ()
128133 .then (doShutdown ())
129- .doFinally (s -> onShutdown .onComplete ( ))
134+ .doFinally (s -> onShutdown .emitEmpty ( RetryEmitFailureHandler . INSTANCE ))
130135 .subscribe (
131136 null ,
132137 th ->
@@ -232,8 +237,8 @@ public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
232237 public Mono <Cluster > start () {
233238 return Mono .defer (
234239 () -> {
235- start .onComplete ( );
236- return onStart .thenReturn (this );
240+ start .emitEmpty ( RetryEmitFailureHandler . INSTANCE );
241+ return onStart .asMono (). thenReturn (this );
237242 });
238243 }
239244
@@ -248,9 +253,9 @@ private Mono<Cluster> doStart() {
248253 private Mono <Cluster > doStart0 () {
249254 return TransportImpl .bind (config .transportConfig ())
250255 .flatMap (
251- transport1 -> {
252- localMember = createLocalMember (transport1 .address ());
253- transport = new SenderAwareTransport (transport1 , localMember .address ());
256+ boundTransport -> {
257+ localMember = createLocalMember (boundTransport .address ());
258+ transport = new SenderAwareTransport (boundTransport , localMember .address ());
254259
255260 cidGenerator = new CorrelationIdGenerator (localMember .id ());
256261 scheduler = Schedulers .newSingle ("sc-cluster-" + localMember .address ().port (), true );
@@ -260,7 +265,7 @@ private Mono<Cluster> doStart0() {
260265 new FailureDetectorImpl (
261266 localMember ,
262267 transport ,
263- membershipEvents .onBackpressureBuffer (),
268+ membershipSink . asFlux () .onBackpressureBuffer (),
264269 config .failureDetectorConfig (),
265270 scheduler ,
266271 cidGenerator );
@@ -269,7 +274,7 @@ private Mono<Cluster> doStart0() {
269274 new GossipProtocolImpl (
270275 localMember ,
271276 transport ,
272- membershipEvents .onBackpressureBuffer (),
277+ membershipSink . asFlux () .onBackpressureBuffer (),
273278 config .gossipConfig (),
274279 scheduler );
275280
@@ -294,8 +299,11 @@ private Mono<Cluster> doStart0() {
294299 membership
295300 .listen ()
296301 /*.publishOn(scheduler)*/
297- // Dont uncomment, already beign executed inside sc-cluster thread
298- .subscribe (membershipSink ::next , this ::onError , membershipSink ::complete ));
302+ // Dont uncomment, already beign executed inside scalecube-cluster thread
303+ .subscribe (
304+ event -> membershipSink .emitNext (event , RetryEmitFailureHandler .INSTANCE ),
305+ ex -> LOGGER .error ("[{}][membership][error] cause:" , localMember , ex ),
306+ () -> membershipSink .emitComplete (RetryEmitFailureHandler .INSTANCE )));
299307
300308 return Mono .fromRunnable (() -> failureDetector .start ())
301309 .then (Mono .fromRunnable (() -> gossip .start ()))
@@ -317,30 +325,45 @@ private void validateConfiguration() {
317325 if (metadataCodec == null ) {
318326 Object metadata = config .metadata ();
319327 if (metadata != null && !(metadata instanceof Serializable )) {
320- throw new IllegalArgumentException (
321- "Invalid cluster configuration: metadata must be Serializable" );
328+ throw new IllegalArgumentException ("Invalid cluster config: metadata must be Serializable" );
322329 }
323330 }
324331
332+ Objects .requireNonNull (
333+ config .transportConfig ().transportFactory (),
334+ "Invalid cluster config: transportFactory must be specified" );
335+
325336 Objects .requireNonNull (
326337 config .transportConfig ().messageCodec (),
327- "Invalid cluster configuration: transport. messageCodec must be specified" );
338+ "Invalid cluster config: messageCodec must be specified" );
328339
329340 Objects .requireNonNull (
330341 config .membershipConfig ().namespace (),
331- "Invalid cluster configuration : membership. namespace must be specified" );
342+ "Invalid cluster config : membership namespace must be specified" );
332343
333344 if (!NAMESPACE_PATTERN .matcher (config .membershipConfig ().namespace ()).matches ()) {
334345 throw new IllegalArgumentException (
335- "Invalid cluster config: membership. namespace format is invalid" );
346+ "Invalid cluster config: membership namespace format is invalid" );
336347 }
337348 }
338349
339350 private void startHandler () {
340351 ClusterMessageHandler handler = this .handler .apply (this );
341- actionsDisposables .add (listenMessage ().subscribe (handler ::onMessage , this ::onError ));
342- actionsDisposables .add (listenMembership ().subscribe (handler ::onMembershipEvent , this ::onError ));
343- actionsDisposables .add (listenGossip ().subscribe (handler ::onGossip , this ::onError ));
352+ actionsDisposables .add (
353+ listenMessage ()
354+ .subscribe (
355+ handler ::onMessage ,
356+ ex -> LOGGER .error ("[{}][onMessage][error] cause:" , localMember , ex )));
357+ actionsDisposables .add (
358+ listenMembership ()
359+ .subscribe (
360+ handler ::onMembershipEvent ,
361+ ex -> LOGGER .error ("[{}][onMembershipEvent][error] cause:" , localMember , ex )));
362+ actionsDisposables .add (
363+ listenGossip ()
364+ .subscribe (
365+ handler ::onGossip ,
366+ ex -> LOGGER .error ("[{}][onGossip][error] cause:" , localMember , ex )));
344367 }
345368
346369 private void startJmxMonitor () {
@@ -357,10 +380,6 @@ private void startJmxMonitor() {
357380 }
358381 }
359382
360- private void onError (Throwable th ) {
361- LOGGER .error ("[{}] Received unexpected error:" , localMember , th );
362- }
363-
364383 private Flux <Message > listenMessage () {
365384 // filter out system messages
366385 return transport .listen ().filter (msg -> !SYSTEM_MESSAGES .contains (msg .qualifier ()));
@@ -373,7 +392,7 @@ private Flux<Message> listenGossip() {
373392
374393 private Flux <MembershipEvent > listenMembership () {
375394 // listen on live stream
376- return membershipEvents .onBackpressureBuffer ();
395+ return membershipSink . asFlux () .onBackpressureBuffer ();
377396 }
378397
379398 /**
@@ -481,7 +500,7 @@ public <T> Mono<Void> updateMetadata(T metadata) {
481500
482501 @ Override
483502 public void shutdown () {
484- shutdown .onComplete ( );
503+ shutdown .emitEmpty ( RetryEmitFailureHandler . INSTANCE );
485504 }
486505
487506 private Mono <Void > doShutdown () {
@@ -524,12 +543,12 @@ private Mono<Void> dispose() {
524543
525544 @ Override
526545 public Mono <Void > onShutdown () {
527- return onShutdown ;
546+ return onShutdown . asMono () ;
528547 }
529548
530549 @ Override
531550 public boolean isShutdown () {
532- return onShutdown .isDisposed ();
551+ return onShutdown .asMono (). toFuture (). isDone ();
533552 }
534553
535554 private static class SenderAwareTransport implements Transport {
@@ -581,4 +600,14 @@ private Message enhanceWithSender(Message message) {
581600 return Message .with (message ).sender (address ).build ();
582601 }
583602 }
603+
604+ private static class RetryEmitFailureHandler implements EmitFailureHandler {
605+
606+ private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler ();
607+
608+ @ Override
609+ public boolean onEmitFailure (SignalType signalType , EmitResult emitResult ) {
610+ return emitResult == FAIL_NON_SERIALIZED ;
611+ }
612+ }
584613}
0 commit comments