Skip to content

Commit 2656bf7

Browse files
committed
GH-3083 Fix lifecycle of reactive producer
Ensure that the corresponding FluxMessageChannel gets properly destroyed during application shutdown Resolves #3083
1 parent 4786fd4 commit 2656bf7

File tree

3 files changed

+29
-2
lines changed

3 files changed

+29
-2
lines changed

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinding.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.commons.logging.Log;
2222
import org.apache.commons.logging.LogFactory;
2323

24+
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
2425
import org.springframework.context.Lifecycle;
2526
import org.springframework.integration.context.IntegrationObjectSupport;
2627
import org.springframework.integration.core.Pausable;
@@ -152,6 +153,10 @@ public synchronized void stop() {
152153
if (this.isRunning()) {
153154
this.lifecycle.stop();
154155
}
156+
// See https://github.com/spring-cloud/spring-cloud-stream/issues/3083 for more details
157+
if (target instanceof DirectWithAttributesChannel attributeChannel) {
158+
attributeChannel.destroy();
159+
}
155160
}
156161

157162
@Override

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,9 @@ public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
265265
.get();
266266
IntegrationFlow postProcessedFlow = (IntegrationFlow) context.getAutowireCapableBeanFactory()
267267
.initializeBean(integrationFlow, integrationFlowName);
268-
context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow);
268+
context.registerBean(integrationFlowName, IntegrationFlow.class, () -> {
269+
return postProcessedFlow;
270+
});
269271
}
270272
else {
271273
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties),
@@ -311,7 +313,12 @@ private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier<?> s
311313
? ((Mono) publisher).map(this::wrapToMessageIfNecessary)
312314
: ((Flux) publisher).map(this::wrapToMessageIfNecessary);
313315

314-
integrationFlowBuilder = IntegrationFlow.from(publisher);
316+
// See https://github.com/spring-cloud/spring-cloud-stream/issues/3083 for more details
317+
DirectWithAttributesChannel messageChannel = context.getBean(bindingName, DirectWithAttributesChannel.class);
318+
FluxMessageChannel reactiveChannel = new FluxMessageChannel();
319+
reactiveChannel.subscribeTo(publisher);
320+
messageChannel.setAttribute(DirectWithAttributesChannel.COMPANION_ATTR, reactiveChannel);
321+
integrationFlowBuilder = IntegrationFlow.from((MessageChannel) reactiveChannel);
315322

316323
// see https://github.com/spring-cloud/spring-cloud-stream/issues/1863 for details about the following code
317324
taskScheduler.schedule(() -> { }, Instant.now()); // will keep AC alive

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/messaging/DirectWithAttributesChannel.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.HashMap;
2020
import java.util.Map;
2121

22+
import org.springframework.integration.channel.AbstractMessageChannel;
2223
import org.springframework.integration.channel.DirectChannel;
2324
import org.springframework.messaging.MessageHandler;
2425

@@ -28,6 +29,11 @@
2829
*/
2930
public class DirectWithAttributesChannel extends DirectChannel {
3031

32+
/**
33+
* Name of the attribute that is considered to be a companion of this channel.
34+
*/
35+
public static String COMPANION_ATTR = "companion";
36+
3137
private final Map<String, Object> attributes = new HashMap<>();
3238

3339
public void setAttribute(String key, Object value) {
@@ -43,6 +49,15 @@ public String getBeanName() {
4349
return this.getComponentName();
4450
}
4551

52+
@Override
53+
public void destroy() {
54+
super.destroy();
55+
Object companion = this.attributes.get(COMPANION_ATTR);
56+
if (companion != null && companion instanceof AbstractMessageChannel companionChannel) {
57+
companionChannel.destroy();
58+
}
59+
}
60+
4661
@Override
4762
public boolean subscribe(MessageHandler handler) {
4863
return this.getDispatcher().getHandlerCount() == 1 ? false : super.subscribe(handler);

0 commit comments

Comments
 (0)