|
18 | 18 |
|
19 | 19 | import java.time.Duration; |
20 | 20 | import java.util.concurrent.TimeUnit; |
| 21 | +import java.util.concurrent.atomic.AtomicReference; |
21 | 22 | import java.util.concurrent.locks.LockSupport; |
22 | 23 |
|
23 | 24 | import org.reactivestreams.Publisher; |
@@ -100,27 +101,52 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) { |
100 | 101 | .share() |
101 | 102 | .subscribe(subscriber); |
102 | 103 |
|
103 | | - this.upstreamSubscriptions.add( |
| 104 | + Mono<Boolean> subscribersBarrier = |
104 | 105 | Mono.fromCallable(() -> this.sink.currentSubscriberCount() > 0) |
105 | 106 | .filter(Boolean::booleanValue) |
106 | 107 | .doOnNext(this.subscribedSignal::tryEmitNext) |
107 | 108 | .repeatWhenEmpty((repeat) -> |
108 | | - this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat) // NOSONAR |
109 | | - .subscribe()); |
| 109 | + this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat); // NOSONAR |
| 110 | + |
| 111 | + addPublisherToSubscribe(Flux.from(subscribersBarrier)); |
| 112 | + } |
| 113 | + |
| 114 | + private void addPublisherToSubscribe(Flux<?> publisher) { |
| 115 | + AtomicReference<Disposable> disposableReference = new AtomicReference<>(); |
| 116 | + |
| 117 | + Disposable disposable = |
| 118 | + publisher |
| 119 | + .doOnTerminate(() -> disposeUpstreamSubscription(disposableReference)) |
| 120 | + .subscribe(); |
| 121 | + |
| 122 | + if (!disposable.isDisposed()) { |
| 123 | + if (this.upstreamSubscriptions.add(disposable)) { |
| 124 | + disposableReference.set(disposable); |
| 125 | + } |
| 126 | + } |
| 127 | + } |
| 128 | + |
| 129 | + private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableReference) { |
| 130 | + Disposable disposable = disposableReference.get(); |
| 131 | + if (disposable != null) { |
| 132 | + this.upstreamSubscriptions.remove(disposable); |
| 133 | + disposable.dispose(); |
| 134 | + } |
110 | 135 | } |
111 | 136 |
|
112 | 137 | @Override |
113 | 138 | public void subscribeTo(Publisher<? extends Message<?>> publisher) { |
114 | | - this.upstreamSubscriptions.add( |
| 139 | + Flux<Object> upstreamPublisher = |
115 | 140 | Flux.from(publisher) |
116 | 141 | .delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next()) |
117 | 142 | .publishOn(this.scheduler) |
118 | 143 | .flatMap((message) -> |
119 | 144 | Mono.just(message) |
120 | 145 | .handle((messageToHandle, sink) -> sendReactiveMessage(messageToHandle)) |
121 | 146 | .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message))) |
122 | | - .contextCapture() |
123 | | - .subscribe()); |
| 147 | + .contextCapture(); |
| 148 | + |
| 149 | + addPublisherToSubscribe(upstreamPublisher); |
124 | 150 | } |
125 | 151 |
|
126 | 152 | private void sendReactiveMessage(Message<?> message) { |
|
0 commit comments