Skip to content

Commit c761b8f

Browse files
committed
Set back directProcessor in CompositeDiscovery, ScaldecubeServiceDsicovery
1 parent 45ff8dc commit c761b8f

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
import reactor.core.Disposable;
5858
import reactor.core.Disposables;
5959
import reactor.core.Exceptions;
60+
import reactor.core.publisher.DirectProcessor;
6061
import reactor.core.publisher.Flux;
62+
import reactor.core.publisher.FluxSink;
6163
import reactor.core.publisher.Mono;
6264
import reactor.core.publisher.SignalType;
6365
import reactor.core.publisher.Sinks;
@@ -555,9 +557,9 @@ private static class CompositeServiceDiscovery implements ServiceDiscovery {
555557
private final Map<String, ServiceDiscoveryContext> discoveryContexts =
556558
new ConcurrentHashMap<>();
557559

558-
// Sink
559-
private final Sinks.Many<ServiceDiscoveryEvent> sink =
560-
Sinks.many().multicast().directBestEffort();
560+
// Subject
561+
private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
562+
private final FluxSink<ServiceDiscoveryEvent> sink = subject.sink();
561563

562564
private final Disposable.Composite disposables = Disposables.composite();
563565
private Scheduler scheduler;
@@ -614,7 +616,7 @@ private Mono<Void> startListen() {
614616
public Flux<ServiceDiscoveryEvent> listen() {
615617
return Flux.fromStream(microservices.serviceRegistry.listServiceEndpoints().stream())
616618
.map(ServiceDiscoveryEvent::newEndpointAdded)
617-
.concatWith(sink.asFlux().onBackpressureBuffer())
619+
.concatWith(subject)
618620
.subscribeOn(scheduler)
619621
.publishOn(scheduler);
620622
}
@@ -650,7 +652,7 @@ private Mono<? extends Void> start0(String id, ServiceDiscovery discovery) {
650652
.subscribeOn(scheduler)
651653
.publishOn(scheduler)
652654
.doOnNext(event -> onDiscoveryEvent(microservices, event))
653-
.doOnNext(event -> sink.emitNext(event, RetryEmitFailureHandler.INSTANCE))
655+
.doOnNext(sink::next)
654656
.subscribe());
655657

656658
return Mono.deferContextual(context -> discovery.start())

0 commit comments

Comments
 (0)