1717package org .springframework .integration .channel ;
1818
1919import java .time .Duration ;
20+ import java .util .ArrayList ;
21+ import java .util .List ;
2022import java .util .concurrent .TimeUnit ;
2123import java .util .concurrent .atomic .AtomicReference ;
2224import java .util .concurrent .locks .LockSupport ;
3032import reactor .core .publisher .Sinks ;
3133import reactor .util .context .ContextView ;
3234
35+ import org .springframework .context .Lifecycle ;
3336import org .springframework .core .log .LogMessage ;
3437import org .springframework .integration .IntegrationMessageHeaderAccessor ;
3538import org .springframework .integration .StaticMessageHeaderAccessor ;
3841import org .springframework .messaging .Message ;
3942import org .springframework .messaging .MessageDeliveryException ;
4043import org .springframework .util .Assert ;
44+ import org .springframework .util .ReflectionUtils ;
4145
4246/**
4347 * The {@link AbstractMessageChannel} implementation for the
4448 * Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
49+ * <p>
50+ * This class implements {@link Lifecycle} to control subscriptions to publishers
51+ * attached via {@link #subscribeTo(Publisher)}, when this channel is restarted.
4552 *
4653 * @author Artem Bilan
4754 * @author Gary Russell
5057 * @since 5.0
5158 */
5259public class FluxMessageChannel extends AbstractMessageChannel
53- implements Publisher <Message <?>>, ReactiveStreamsSubscribableChannel {
60+ implements Publisher <Message <?>>, ReactiveStreamsSubscribableChannel , Lifecycle {
5461
5562 private final Sinks .Many <Message <?>> sink = Sinks .many ().multicast ().onBackpressureBuffer (1 , false );
5663
57- private final Disposable .Composite upstreamSubscriptions = Disposables .composite ();
64+ private final List <Publisher <? extends Message <?>>> sourcePublishers = new ArrayList <>();
65+
66+ private volatile Disposable .Composite upstreamSubscriptions = Disposables .composite ();
5867
5968 private volatile boolean active = true ;
6069
@@ -111,19 +120,22 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
111120 .subscribe (subscriber );
112121 }
113122
114- private void addPublisherToSubscribe (Flux <?> publisher ) {
115- AtomicReference <Disposable > disposableReference = new AtomicReference <>();
123+ @ Override
124+ public void start () {
125+ this .active = true ;
126+ this .upstreamSubscriptions = Disposables .composite ();
127+ this .sourcePublishers .forEach (this ::doSubscribeTo );
128+ }
116129
117- Disposable disposable =
118- publisher
119- .doOnTerminate (() -> disposeUpstreamSubscription (disposableReference ))
120- .subscribe ();
130+ @ Override
131+ public void stop () {
132+ this .active = false ;
133+ this .upstreamSubscriptions .dispose ();
134+ }
121135
122- if (!disposable .isDisposed ()) {
123- if (this .upstreamSubscriptions .add (disposable )) {
124- disposableReference .set (disposable );
125- }
126- }
136+ @ Override
137+ public boolean isRunning () {
138+ return this .active ;
127139 }
128140
129141 private void disposeUpstreamSubscription (AtomicReference <Disposable > disposableReference ) {
@@ -136,8 +148,14 @@ private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableR
136148
137149 @ Override
138150 public void subscribeTo (Publisher <? extends Message <?>> publisher ) {
151+ this .sourcePublishers .add (publisher );
152+ doSubscribeTo (publisher );
153+ }
154+
155+ private void doSubscribeTo (Publisher <? extends Message <?>> publisher ) {
139156 Flux <Object > upstreamPublisher =
140157 Flux .from (publisher )
158+ .doOnComplete (() -> this .sourcePublishers .remove (publisher ))
141159 .delaySubscription (
142160 Mono .fromCallable (this .sink ::currentSubscriberCount )
143161 .filter ((value ) -> value > 0 )
@@ -152,6 +170,21 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
152170 addPublisherToSubscribe (upstreamPublisher );
153171 }
154172
173+ private void addPublisherToSubscribe (Flux <?> publisher ) {
174+ AtomicReference <Disposable > disposableReference = new AtomicReference <>();
175+
176+ Disposable disposable =
177+ publisher
178+ .doOnTerminate (() -> disposeUpstreamSubscription (disposableReference ))
179+ .subscribe ();
180+
181+ if (!disposable .isDisposed ()) {
182+ if (this .upstreamSubscriptions .add (disposable )) {
183+ disposableReference .set (disposable );
184+ }
185+ }
186+ }
187+
155188 private void sendReactiveMessage (Message <?> message ) {
156189 Message <?> messageToSend = message ;
157190 // We have just restored Reactor context, so no need in a header anymore.
@@ -169,14 +202,20 @@ private void sendReactiveMessage(Message<?> message) {
169202 }
170203 }
171204 catch (Exception ex ) {
172- logger .warn (ex , LogMessage .format ("Error during processing event: %s" , messageToSend ));
205+ if (isApplicationRunning ()) {
206+ logger .error (ex , LogMessage .format ("Error during processing event: %s" , messageToSend ));
207+ }
208+ else {
209+ ReflectionUtils .rethrowRuntimeException (ex );
210+ }
173211 }
174212 }
175213
176214 @ Override
177215 public void destroy () {
178216 this .active = false ;
179217 this .upstreamSubscriptions .dispose ();
218+ this .sourcePublishers .clear ();
180219 this .sink .emitComplete (Sinks .EmitFailureHandler .busyLooping (Duration .ofSeconds (1 )));
181220 super .destroy ();
182221 }
0 commit comments