Skip to content

Commit 7e8991a

Browse files
artembilangaryrussell
authored andcommitted
INT-4510: Test no memory leak in FluxMessageCh
JIRA: https://jira.spring.io/browse/INT-4510 Related to reactor/reactor-core#1290 The `Flux.publish()` and subsequent `connect()` doesn't fuse a subscriber for the hooks flow is interrupted (complete, or error, or disconnect). In this case the `FluxMessageChannel.publishers` store is not cleared from the finished publishers * Add test-case to check the `FluxMessageChannel.publishers` store after finishing the stream * Add `hide()` operator with TODO to remove when an appropriate Reactor version is ready **Cherry-pick to 5.0.x**
1 parent 3f66e1e commit 7e8991a

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public void subscribeTo(Publisher<Message<?>> publisher) {
8383
Flux.from(publisher)
8484
.doOnComplete(() -> this.publishers.remove(publisher))
8585
.doOnNext(this::send)
86+
.hide() // TODO remove after upgrade to Reactor 3.1.9.RELEASE or later
8687
.publish();
8788

8889
this.publishers.put(publisher, connectableFlux);

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,8 +26,10 @@
2626

2727
import java.util.ArrayList;
2828
import java.util.List;
29+
import java.util.Map;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.stream.IntStream;
3133

3234
import org.junit.Test;
3335
import org.junit.runner.RunWith;
@@ -40,6 +42,11 @@
4042
import org.springframework.integration.channel.MessageChannelReactiveUtils;
4143
import org.springframework.integration.channel.QueueChannel;
4244
import org.springframework.integration.config.EnableIntegration;
45+
import org.springframework.integration.dsl.IntegrationFlow;
46+
import org.springframework.integration.dsl.channel.MessageChannels;
47+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
48+
import org.springframework.integration.dsl.context.IntegrationFlowRegistration;
49+
import org.springframework.integration.test.util.TestUtils;
4350
import org.springframework.messaging.Message;
4451
import org.springframework.messaging.MessageChannel;
4552
import org.springframework.messaging.MessagingException;
@@ -53,6 +60,7 @@
5360

5461
/**
5562
* @author Artem Bilan
63+
*
5664
* @since 5.0
5765
*/
5866
@RunWith(SpringRunner.class)
@@ -68,8 +76,11 @@ public class FluxMessageChannelTests {
6876
@Autowired
6977
private PollableChannel errorChannel;
7078

79+
@Autowired
80+
private IntegrationFlowContext integrationFlowContext;
81+
7182
@Test
72-
public void testFluxMessageChannel() throws InterruptedException {
83+
public void testFluxMessageChannel() {
7384
QueueChannel replyChannel = new QueueChannel();
7485

7586
for (int i = 0; i < 10; i++) {
@@ -106,6 +117,31 @@ public void testMessageChannelReactiveAdaptation() throws InterruptedException {
106117
assertThat(results, contains("FOO", "BAR"));
107118
}
108119

120+
@Test
121+
public void testFluxMessageChannelCleanUp() throws InterruptedException {
122+
FluxMessageChannel flux = MessageChannels.flux().get();
123+
124+
CountDownLatch finishLatch = new CountDownLatch(1);
125+
126+
IntegrationFlow testFlow = f -> f
127+
.<String>split(__ -> Flux.fromStream(IntStream.range(0, 100).boxed()), null)
128+
.channel(flux)
129+
.aggregate(a -> a.releaseStrategy(m -> m.size() == 100))
130+
.handle(__ -> finishLatch.countDown());
131+
132+
IntegrationFlowRegistration flowRegistration =
133+
this.integrationFlowContext.registration(testFlow)
134+
.register();
135+
136+
flowRegistration.getInputChannel().send(new GenericMessage<>("foo"));
137+
138+
assertTrue(finishLatch.await(10, TimeUnit.SECONDS));
139+
140+
assertTrue(TestUtils.getPropertyValue(flux, "publishers", Map.class).isEmpty());
141+
142+
flowRegistration.destroy();
143+
}
144+
109145
@Configuration
110146
@EnableIntegration
111147
public static class TestConfiguration {

0 commit comments

Comments
 (0)