Skip to content

Commit fb60019

Browse files
artembilanspring-builds
authored andcommitted
GH-9430: Fix leak for the TCP/IP caching
Related to: #9430 (cherry picked from commit 8242ca6)
1 parent 845aae6 commit fb60019

File tree

4 files changed

+45
-17
lines changed

4 files changed

+45
-17
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ public abstract class AbstractConnectionFactory extends IntegrationObjectSupport
7878

7979
protected final Lock lifecycleMonitor = new ReentrantLock(); // NOSONAR final
8080

81-
private final Map<String, TcpConnectionSupport> connections = new ConcurrentHashMap<>();
81+
protected final Map<String, TcpConnectionSupport> connections = new ConcurrentHashMap<>(); // NOSONAR final
8282

83-
private final Lock connectionsMonitor = new ReentrantLock();
83+
protected final Lock connectionsMonitor = new ReentrantLock(); // NOSONAR final
8484

8585
private final BlockingQueue<PendingIO> delayedReads = new LinkedBlockingQueue<>();
8686

@@ -977,7 +977,7 @@ public boolean closeConnection(String connectionId) {
977977
this.connectionsMonitor.lock();
978978
try {
979979
boolean closed = false;
980-
TcpConnectionSupport connection = removeConnection(connectionId);
980+
TcpConnectionSupport connection = this.connections.remove(connectionId);
981981
if (connection != null) {
982982
try {
983983
connection.close();
@@ -996,11 +996,6 @@ public boolean closeConnection(String connectionId) {
996996
}
997997
}
998998

999-
@Nullable
1000-
protected TcpConnectionSupport removeConnection(String connectionId) {
1001-
return this.connections.remove(connectionId);
1002-
}
1003-
1004999
@Override
10051000
public String toString() {
10061001
return super.toString()

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.messaging.Message;
3434
import org.springframework.messaging.MessagingException;
3535
import org.springframework.messaging.support.ErrorMessage;
36+
import org.springframework.util.Assert;
3637

3738
/**
3839
* Connection factory that caches connections from the underlying target factory. The underlying
@@ -359,8 +360,31 @@ public void enableManualListenerRegistration() {
359360
}
360361

361362
@Override
362-
protected TcpConnectionSupport removeConnection(String connectionId) {
363-
return this.targetConnectionFactory.removeConnection(connectionId.replaceFirst("Cached:", ""));
363+
public boolean closeConnection(String connectionId) {
364+
Assert.notNull(connectionId, "'connectionId' to close must not be null");
365+
String targetConnectionId = connectionId.replaceFirst("Cached:", "");
366+
this.connectionsMonitor.lock();
367+
try {
368+
TcpConnectionSupport targetConnection = this.targetConnectionFactory.connections.get(targetConnectionId);
369+
if (targetConnection != null) {
370+
/*
371+
* If the delegate is stopped, actually close the connection, but still release
372+
* it to the pool, it will be discarded/renewed the next time it is retrieved.
373+
*/
374+
if (!isRunning()) {
375+
logger.debug(() -> "Factory not running - closing " + connectionId);
376+
super.closeConnection(targetConnectionId);
377+
}
378+
CachingClientConnectionFactory.this.pool.releaseItem(targetConnection);
379+
return true;
380+
}
381+
else {
382+
return false;
383+
}
384+
}
385+
finally {
386+
this.connectionsMonitor.unlock();
387+
}
364388
}
365389

366390
@Override

spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.context.ConfigurableApplicationContext;
3636
import org.springframework.context.annotation.Bean;
3737
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.context.event.EventListener;
3839
import org.springframework.integration.MessageTimeoutException;
3940
import org.springframework.integration.channel.QueueChannel;
4041
import org.springframework.integration.config.EnableIntegration;
@@ -51,6 +52,7 @@
5152
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
5253
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
5354
import org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory;
55+
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
5456
import org.springframework.integration.ip.tcp.connection.TcpConnectionServerListeningEvent;
5557
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
5658
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
@@ -261,6 +263,7 @@ void allRepliesAreReceivedViaLimitedCachingConnectionFactory() {
261263
IntStream.rangeClosed('a', 'z')
262264
.mapToObj((characterCode) -> (char) characterCode)
263265
.map((character) -> "" + character)
266+
.parallel()
264267
.peek((character) -> this.outboundFlowInput.send(new GenericMessage<>(character)))
265268
.map(String::toUpperCase)
266269
.toList();
@@ -274,6 +277,8 @@ void allRepliesAreReceivedViaLimitedCachingConnectionFactory() {
274277
}
275278

276279
assertThat(replies).containsAll(expected);
280+
281+
assertThat(config.openEvents).hasSizeLessThanOrEqualTo(5);
277282
}
278283

279284
@Configuration
@@ -431,9 +436,18 @@ public TcpNetClientConnectionFactorySpec client3() {
431436
return Tcp.netClient("localhost", 0);
432437
}
433438

439+
final List<TcpConnectionOpenEvent> openEvents = new ArrayList<>();
440+
441+
@EventListener
442+
void connectionOpened(TcpConnectionOpenEvent tcpConnectionOpenEvent) {
443+
if ("client3".equals(tcpConnectionOpenEvent.getConnectionFactoryName())) {
444+
this.openEvents.add(tcpConnectionOpenEvent);
445+
}
446+
}
447+
434448
@Bean
435449
CachingClientConnectionFactory cachingClient(TcpNetClientConnectionFactory client3) {
436-
var cachingClientConnectionFactory = new CachingClientConnectionFactory(client3, 10);
450+
var cachingClientConnectionFactory = new CachingClientConnectionFactory(client3, 5);
437451
cachingClientConnectionFactory.setConnectionWaitTimeout(10_000);
438452
return cachingClientConnectionFactory;
439453
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,7 @@ private TcpConnectionSupport mockedTcpNetConnection() throws IOException {
389389

390390
private TcpConnectionSupport mockedTcpNioConnection() throws Exception {
391391
SocketChannel socketChannel = mock(SocketChannel.class);
392-
if (System.getProperty("java.version").startsWith("1.8")) {
393-
new DirectFieldAccessor(socketChannel).setPropertyValue("open", false);
394-
}
395-
else {
396-
new DirectFieldAccessor(socketChannel).setPropertyValue("closed", true);
397-
}
392+
new DirectFieldAccessor(socketChannel).setPropertyValue("closed", true);
398393
doThrow(new IOException("Foo")).when(socketChannel).write(Mockito.any(ByteBuffer.class));
399394
when(socketChannel.socket()).thenReturn(mock(Socket.class));
400395
TcpNioConnection conn = new TcpNioConnection(socketChannel, false, false, event -> {

0 commit comments

Comments
 (0)