Skip to content

Commit 1d4626f

Browse files
garyrussellartembilan
authored andcommitted
GH-3256: Support Testing TCP Connections
Resolves #3256
1 parent 02407f7 commit 1d4626f

File tree

6 files changed

+265
-37
lines changed

6 files changed

+265
-37
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
1921
import java.net.Socket;
2022
import java.time.Duration;
2123
import java.util.concurrent.locks.ReadWriteLock;
2224
import java.util.concurrent.locks.ReentrantReadWriteLock;
25+
import java.util.function.Predicate;
2326

2427
import org.springframework.context.ApplicationEventPublisher;
2528
import org.springframework.lang.Nullable;
@@ -44,6 +47,9 @@ public abstract class AbstractClientConnectionFactory extends AbstractConnection
4447

4548
private Duration connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT);
4649

50+
@Nullable
51+
private Predicate<TcpConnectionSupport> connectionTest;
52+
4753
private volatile TcpConnectionSupport theConnection;
4854

4955
/**
@@ -68,6 +74,7 @@ protected Duration getConnectTimeout() {
6874
return this.connectTimeout;
6975
}
7076

77+
7178
/**
7279
* Set whether to automatically (default) or manually add a {@link TcpListener} to the
7380
* connections created by this factory. By default, the factory automatically configures
@@ -79,6 +86,27 @@ public void enableManualListenerRegistration() {
7986
this.manualListenerRegistration = true;
8087
}
8188

89+
/**
90+
* Get a {@link Predicate} that will be invoked to test a new connection; return true
91+
* to accept the connection, false the reject.
92+
* @return the predicate.
93+
* @since 5.3
94+
*/
95+
@Nullable
96+
protected Predicate<TcpConnectionSupport> getConnectionTest() {
97+
return this.connectionTest;
98+
}
99+
100+
/**
101+
* Set a {@link Predicate} that will be invoked to test a new connection; return true
102+
* to accept the connection, false the reject.
103+
* @param connectionTest the predicate.
104+
* @since 5.3
105+
*/
106+
public void setConnectionTest(@Nullable Predicate<TcpConnectionSupport> connectionTest) {
107+
this.connectionTest = connectionTest;
108+
}
109+
82110
/**
83111
* Obtains a connection - if {@link #setSingleUse(boolean)} was called with
84112
* true, a new connection is returned; otherwise a single connection is
@@ -126,21 +154,12 @@ protected final TcpConnectionSupport obtainNewConnection() throws InterruptedExc
126154
if (!singleUse) {
127155
// Another write lock holder might have created a new one by now.
128156
connection = obtainSharedConnection();
129-
if (connection != null) {
157+
if (connection != null && connection.isOpen()) {
130158
return connection;
131159
}
132160
}
133161

134-
if (logger.isDebugEnabled()) {
135-
logger.debug("Opening new socket connection to " + getHost() + ":" + getPort());
136-
}
137-
138-
connection = buildNewConnection();
139-
if (!singleUse) {
140-
setTheConnection(connection);
141-
}
142-
connection.publishConnectionOpenEvent();
143-
return connection;
162+
return doObtain(singleUse);
144163
}
145164
catch (RuntimeException e) {
146165
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
@@ -156,6 +175,24 @@ protected final TcpConnectionSupport obtainNewConnection() throws InterruptedExc
156175
}
157176
}
158177

178+
private TcpConnectionSupport doObtain(boolean singleUse) {
179+
TcpConnectionSupport connection;
180+
if (logger.isDebugEnabled()) {
181+
logger.debug("Opening new socket connection to " + getHost() + ":" + getPort());
182+
}
183+
184+
connection = buildNewConnection();
185+
if (this.connectionTest != null && !this.connectionTest.test(connection)) {
186+
connection.close();
187+
throw new UncheckedIOException(new IOException("Connection test failed for " + connection));
188+
}
189+
if (!singleUse) {
190+
setTheConnection(connection);
191+
}
192+
connection.publishConnectionOpenEvent();
193+
return connection;
194+
}
195+
159196
protected TcpConnectionSupport buildNewConnection() {
160197
throw new UnsupportedOperationException(
161198
"Factories that don't override this class' obtainConnection() must implement this method");
@@ -187,6 +224,9 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock
187224
connection.setMapper(getMapper());
188225
connection.setDeserializer(getDeserializer());
189226
connection.setSerializer(getSerializer());
227+
if (this.connectionTest != null) {
228+
connection.setNeedsTest(true);
229+
}
190230
}
191231

192232
/**

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public abstract class TcpConnectionSupport implements TcpConnection {
7878

7979
private TcpListener listener;
8080

81+
private volatile TcpListener testListener;
82+
8183
private TcpSender sender;
8284

8385
private String connectionId;
@@ -92,6 +94,13 @@ public abstract class TcpConnectionSupport implements TcpConnection {
9294

9395
private boolean manualListenerRegistration;
9496

97+
/*
98+
* This boolean is to avoid looking for a temporary listener when not needed
99+
* to avoid a CPU cache flush. This does not have to be volatile because it
100+
* is reset by the thread that checks for the temporary listener.
101+
*/
102+
private boolean needsTest;
103+
95104
public TcpConnectionSupport() {
96105
this(null);
97106
}
@@ -238,6 +247,15 @@ public void setSerializer(Serializer<?> serializer) {
238247
}
239248
}
240249

250+
/**
251+
* Set to true to use a temporary listener for just the first incoming message.
252+
* @param needsTest true for a temporary listener.
253+
* @since 5.3
254+
*/
255+
public void setNeedsTest(boolean needsTest) {
256+
this.needsTest = needsTest;
257+
}
258+
241259
/**
242260
* Set the listener that will receive incoming Messages.
243261
* @param listener The listener.
@@ -247,6 +265,17 @@ public void registerListener(@Nullable TcpListener listener) {
247265
this.listenerRegisteredLatch.countDown();
248266
}
249267

268+
/**
269+
* Set a temporary listener to receive just the first incoming message.
270+
* Used in conjunction with a connectionTest in a client connection
271+
* factory.
272+
* @param tListener the test listener.
273+
* @since 5.3
274+
*/
275+
public void registerTestListener(TcpListener tListener) {
276+
this.testListener = tListener;
277+
}
278+
250279
/**
251280
* Set whether or not automatic or manual registration of the {@link TcpListener} is to be
252281
* used. (Default automatic). When manual registration is in place, incoming messages will
@@ -282,6 +311,10 @@ public TcpListener getListener() {
282311
}
283312
waitForListenerRegistration();
284313
}
314+
if (this.needsTest && this.testListener != null) {
315+
this.needsTest = false;
316+
return this.testListener;
317+
}
285318
return this.listener;
286319
}
287320

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpSender.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
* @since 2.0
2424
*
2525
*/
26+
@FunctionalInterface
2627
public interface TcpSender {
2728

2829
/**
@@ -37,6 +38,7 @@ public interface TcpSender {
3738
* method is called each time a connection is closed.
3839
* @param connection The connection.
3940
*/
40-
void removeDeadConnection(TcpConnection connection);
41+
default void removeDeadConnection(TcpConnection connection) {
42+
}
4143

4244
}

0 commit comments

Comments
 (0)