Skip to content

Commit 5e0e620

Browse files
artembilangaryrussell
authored andcommitted
Cancel subscription for MPS.subscribeToPublisher
The `Flux.takeWhile()` only works if there is data in the `Publisher` to consume. We still need to be able to cancel subscription and stop producing even if there is no data at the moment. * Change `takeWhile()` to the `doOnSubscribe()` and store `subscription` in the `volatile` property of the `MessageProducerSupport` * Cancel such a subscription in the `doStop()` impl * Propagate `doStop()` to super in the `ZeroMqMessageProducer` which is only one reactive channel adapter overriding `doStop()` * Verify in the `ReactiveMessageProducerTests` that subscription is cancelled for delayed data in the `Publisher` **Cherry-pick to `5.5.x`**
1 parent 30a2f18 commit 5e0e620

File tree

3 files changed

+45
-7
lines changed

3 files changed

+45
-7
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.endpoint;
1818

1919
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscription;
2021

2122
import org.springframework.beans.factory.BeanFactory;
2223
import org.springframework.beans.factory.SmartInitializingSingleton;
@@ -66,6 +67,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements
6667

6768
private boolean shouldTrack = false;
6869

70+
private volatile Subscription subscription;
71+
6972
protected MessageProducerSupport() {
7073
this.setPhase(Integer.MAX_VALUE / 2);
7174
}
@@ -196,6 +199,11 @@ protected void doStart() {
196199
*/
197200
@Override
198201
protected void doStop() {
202+
Subscription subscriptionToCancel = this.subscription;
203+
if (subscriptionToCancel != null) {
204+
this.subscription = null;
205+
subscriptionToCancel.cancel();
206+
}
199207
}
200208

201209
protected void sendMessage(Message<?> messageArg) {
@@ -222,7 +230,7 @@ protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
222230
.map(this::trackMessageIfAny)
223231
.doOnComplete(this::stop)
224232
.doOnCancel(this::stop)
225-
.takeWhile((message) -> isActive());
233+
.doOnSubscribe((subscription) -> this.subscription = subscription);
226234

227235
if (channelForSubscription instanceof ReactiveStreamsSubscribableChannel) {
228236
((ReactiveStreamsSubscribableChannel) channelForSubscription).subscribeTo(messageFlux);

spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,19 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import java.time.Duration;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
2024

2125
import org.junit.jupiter.api.Test;
2226

2327
import org.springframework.beans.factory.annotation.Autowired;
2428
import org.springframework.context.annotation.Bean;
2529
import org.springframework.context.annotation.Configuration;
2630
import org.springframework.integration.channel.FluxMessageChannel;
31+
import org.springframework.integration.channel.NullChannel;
2732
import org.springframework.integration.config.EnableIntegration;
2833
import org.springframework.messaging.Message;
2934
import org.springframework.messaging.support.GenericMessage;
@@ -49,12 +54,36 @@ public class ReactiveMessageProducerTests {
4954
public MessageProducerSupport producer;
5055

5156
@Test
52-
public void test() {
57+
public void testEmptyPublisherUnsubscription() throws InterruptedException {
58+
CountDownLatch cancelLatch = new CountDownLatch(1);
59+
MessageProducerSupport producer =
60+
new MessageProducerSupport() {
61+
62+
@Override
63+
protected void doStart() {
64+
super.doStart();
65+
subscribeToPublisher(
66+
Flux.just("test1")
67+
.delayElements(Duration.ofSeconds(10))
68+
.map(GenericMessage::new)
69+
.doOnCancel(cancelLatch::countDown));
70+
}
71+
72+
};
73+
producer.setOutputChannel(new NullChannel());
74+
producer.start();
75+
producer.stop();
76+
77+
assertThat(cancelLatch.await(10, TimeUnit.SECONDS)).isTrue();
78+
}
79+
80+
@Test
81+
public void testReactiveMessageProducerFromContext() {
5382
StepVerifier stepVerifier =
5483
StepVerifier.create(
55-
Flux.from(this.fluxMessageChannel)
56-
.map(Message::getPayload)
57-
.cast(String.class))
84+
Flux.from(this.fluxMessageChannel)
85+
.map(Message::getPayload)
86+
.cast(String.class))
5887
.expectNext("test1", "test2")
5988
.thenCancel()
6089
.verifyLater();

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -292,6 +292,7 @@ private Mono<Message<?>> convertMessage(Mono<ZMsg> msgMono) {
292292

293293
@Override
294294
protected void doStop() {
295+
super.doStop();
295296
this.socketMono.doOnNext(ZMQ.Socket::close).subscribe();
296297
}
297298

0 commit comments

Comments
 (0)