Skip to content

Commit 845aae6

Browse files
artembilanspring-builds
authored andcommitted
GH-9430: Fix CachingClientCF for collaborating channel adapters (#9431)
Fixes: #9430 When `CachingClientConnectionFactory` is used in combination of `Tcp.outboundAdapter()` & `Tcp.inboundAdapter()`, the connection is not released back to the cache because `CachingClientConnectionFactory` does not store created connections into its `connections` property. So, when `TcpReceivingChannelAdapter` calls `this.clientConnectionFactory.closeConnection(connectionId);` it returned immediately because there is nothing to remove from the `this.connections` * Add `protected removeConnection()` in the `AbstractConnectionFactory` and override it in the `CachingClientConnectionFactory` with delegation to the `this.targetConnectionFactory` * Demonstrate the problem in the new `IpIntegrationTests.allRepliesAreReceivedViaLimitedCachingConnectionFactory()` test and ensure that all 27 letters from English alphabet are sent to the server and received in uppercase while size of the `CachingClientConnectionFactory` is only `10` (cherry picked from commit dc02dec)
1 parent 561e603 commit 845aae6

File tree

3 files changed

+101
-10
lines changed

3 files changed

+101
-10
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -977,7 +977,7 @@ public boolean closeConnection(String connectionId) {
977977
this.connectionsMonitor.lock();
978978
try {
979979
boolean closed = false;
980-
TcpConnectionSupport connection = this.connections.remove(connectionId);
980+
TcpConnectionSupport connection = removeConnection(connectionId);
981981
if (connection != null) {
982982
try {
983983
connection.close();
@@ -996,6 +996,11 @@ public boolean closeConnection(String connectionId) {
996996
}
997997
}
998998

999+
@Nullable
1000+
protected TcpConnectionSupport removeConnection(String connectionId) {
1001+
return this.connections.remove(connectionId);
1002+
}
1003+
9991004
@Override
10001005
public String toString() {
10011006
return super.toString()

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,11 @@ public void enableManualListenerRegistration() {
358358
this.targetConnectionFactory.enableManualListenerRegistration();
359359
}
360360

361+
@Override
362+
protected TcpConnectionSupport removeConnection(String connectionId) {
363+
return this.targetConnectionFactory.removeConnection(connectionId.replaceFirst("Cached:", ""));
364+
}
365+
361366
@Override
362367
public void start() {
363368
setActive(true);

spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package org.springframework.integration.ip.dsl;
1818

19+
import java.util.ArrayList;
1920
import java.util.Collections;
21+
import java.util.List;
2022
import java.util.concurrent.CountDownLatch;
2123
import java.util.concurrent.TimeUnit;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.stream.IntStream;
2427

2528
import org.aopalliance.intercept.MethodInterceptor;
2629
import org.junit.jupiter.api.Test;
@@ -47,6 +50,7 @@
4750
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
4851
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
4952
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
53+
import org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory;
5054
import org.springframework.integration.ip.tcp.connection.TcpConnectionServerListeningEvent;
5155
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
5256
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
@@ -55,12 +59,12 @@
5559
import org.springframework.integration.ip.udp.MulticastSendingMessageHandler;
5660
import org.springframework.integration.ip.udp.UdpServerListeningEvent;
5761
import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter;
58-
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
5962
import org.springframework.integration.ip.util.TestingUtilities;
6063
import org.springframework.integration.support.MessageBuilder;
6164
import org.springframework.integration.test.util.TestUtils;
6265
import org.springframework.messaging.Message;
6366
import org.springframework.messaging.MessageChannel;
67+
import org.springframework.messaging.PollableChannel;
6468
import org.springframework.messaging.support.GenericMessage;
6569
import org.springframework.test.annotation.DirtiesContext;
6670
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -102,9 +106,6 @@ public class IpIntegrationTests {
102106
@Autowired
103107
private UnicastReceivingChannelAdapter udpInbound;
104108

105-
@Autowired
106-
private UnicastSendingMessageHandler udpOutbound;
107-
108109
@Autowired
109110
private QueueChannel udpIn;
110111

@@ -236,6 +237,45 @@ void async() {
236237
assertThat(TestUtils.getPropertyValue(this.tcpOutAsync, "async", Boolean.class)).isTrue();
237238
}
238239

240+
@Autowired
241+
private AbstractServerConnectionFactory server2;
242+
243+
@Autowired
244+
private TcpNetClientConnectionFactory client3;
245+
246+
@Autowired
247+
@Qualifier("outboundFlow.input")
248+
MessageChannel outboundFlowInput;
249+
250+
@Autowired
251+
PollableChannel cachingRepliesChannel;
252+
253+
@Test
254+
void allRepliesAreReceivedViaLimitedCachingConnectionFactory() {
255+
this.client3.stop();
256+
TestingUtilities.waitListening(this.server2, null);
257+
this.client3.setPort(this.server2.getPort());
258+
this.client3.start();
259+
260+
List<String> expected =
261+
IntStream.rangeClosed('a', 'z')
262+
.mapToObj((characterCode) -> (char) characterCode)
263+
.map((character) -> "" + character)
264+
.peek((character) -> this.outboundFlowInput.send(new GenericMessage<>(character)))
265+
.map(String::toUpperCase)
266+
.toList();
267+
268+
List<String> replies = new ArrayList<>();
269+
270+
for (int i = 0; i < expected.size(); i++) {
271+
Message<?> replyMessage = this.cachingRepliesChannel.receive(10_000);
272+
assertThat(replyMessage).isNotNull();
273+
replies.add(replyMessage.getPayload().toString());
274+
}
275+
276+
assertThat(replies).containsAll(expected);
277+
}
278+
239279
@Configuration
240280
@EnableIntegration
241281
public static class Config {
@@ -318,8 +358,9 @@ public ApplicationListener<UdpServerListeningEvent> events() {
318358
}
319359

320360
@Bean
321-
public TcpNetClientConnectionFactorySpec client1(TcpNetServerConnectionFactory server1) {
322-
return Tcp.netClient("localhost", server1.getPort())
361+
public TcpNetClientConnectionFactorySpec client1() {
362+
// The port from server is assigned
363+
return Tcp.netClient("localhost", 0)
323364
.serializer(TcpCodecs.crlf())
324365
.deserializer(TcpCodecs.lengthHeader1());
325366
}
@@ -337,8 +378,9 @@ public QueueChannel unsolicited() {
337378
}
338379

339380
@Bean
340-
public TcpNetClientConnectionFactorySpec client2(TcpNetServerConnectionFactory server1) {
341-
return Tcp.netClient("localhost", server1.getPort())
381+
public TcpNetClientConnectionFactorySpec client2() {
382+
// The port from server is assigned
383+
return Tcp.netClient("localhost", 0)
342384
.serializer(TcpCodecs.crlf())
343385
.deserializer(TcpCodecs.lengthHeader1());
344386
}
@@ -370,6 +412,45 @@ public IntegrationFlow clientTcpFlow(TcpOutboundGateway tcpOut) {
370412
.transform(Transformers.objectToString());
371413
}
372414

415+
@Bean
416+
public TcpNetServerConnectionFactorySpec server2() {
417+
return Tcp.netServer(0);
418+
}
419+
420+
@Bean
421+
public IntegrationFlow server2Flow(TcpNetServerConnectionFactory server2) {
422+
return IntegrationFlow.from(Tcp.inboundGateway(server2))
423+
.transform(Transformers.objectToString())
424+
.<String, String>transform(String::toUpperCase)
425+
.get();
426+
}
427+
428+
@Bean
429+
public TcpNetClientConnectionFactorySpec client3() {
430+
// The port from server is assigned
431+
return Tcp.netClient("localhost", 0);
432+
}
433+
434+
@Bean
435+
CachingClientConnectionFactory cachingClient(TcpNetClientConnectionFactory client3) {
436+
var cachingClientConnectionFactory = new CachingClientConnectionFactory(client3, 10);
437+
cachingClientConnectionFactory.setConnectionWaitTimeout(10_000);
438+
return cachingClientConnectionFactory;
439+
}
440+
441+
@Bean
442+
IntegrationFlow outboundFlow(CachingClientConnectionFactory cachingClient) {
443+
return (flow) -> flow.handle(Tcp.outboundAdapter(cachingClient));
444+
}
445+
446+
@Bean
447+
IntegrationFlow inboundFlow(CachingClientConnectionFactory cachingClient) {
448+
return IntegrationFlow.from(Tcp.inboundAdapter(cachingClient))
449+
.transform(Transformers.objectToString())
450+
.channel((channels) -> channels.queue("cachingRepliesChannel"))
451+
.get();
452+
}
453+
373454
}
374455

375456
}

0 commit comments

Comments
 (0)