Skip to content

Commit 8860639

Browse files
authored
Merge pull request #3429 from ControlSystemStudio/pva_connect
PVA: Handle client's connection to server in receive thread
2 parents b754c30 + 263cd25 commit 8860639

File tree

3 files changed

+71
-22
lines changed

3 files changed

+71
-22
lines changed

core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static org.epics.pva.PVASettings.logger;
1111

1212
import java.net.InetSocketAddress;
13-
import java.net.Socket;
1413
import java.nio.ByteBuffer;
1514
import java.util.Collection;
1615
import java.util.concurrent.ConcurrentHashMap;
@@ -56,6 +55,12 @@ class ClientTCPHandler extends TCPHandler
5655
new GetTypeHandler(),
5756
new RPCHandler());
5857

58+
/** Address of server to which this client will connect */
59+
private final InetSocketAddress server_address;
60+
61+
/** Is this a TLS connection or plain TCP? */
62+
private final boolean tls;
63+
5964
/** Client context */
6065
private final PVAClient client;
6166

@@ -112,11 +117,34 @@ class ClientTCPHandler extends TCPHandler
112117

113118
public ClientTCPHandler(final PVAClient client, final InetSocketAddress address, final Guid guid, final boolean tls) throws Exception
114119
{
115-
super(createSocket(address, tls), true);
120+
super(true);
116121
logger.log(Level.FINE, () -> "TCPHandler " + (tls ? "(TLS) " : "") + guid + " for " + address + " created ============================");
122+
this.server_address = address;
123+
this.tls = tls;
117124
this.client = client;
118125
this.guid = guid;
119126

127+
// Start receiver, but not the send thread, yet.
128+
// To prevent sending messages before the server is ready,
129+
// it's started when server confirms the connection.
130+
startReceiver();
131+
}
132+
133+
@Override
134+
protected boolean initializeSocket()
135+
{
136+
try
137+
{
138+
socket = SecureSockets.createClientSocket(server_address, tls);
139+
socket.setTcpNoDelay(true);
140+
socket.setKeepAlive(true);
141+
}
142+
catch (Exception ex)
143+
{
144+
logger.log(Level.WARNING, "PVA client cannot connect to " + server_address, ex);
145+
return false;
146+
}
147+
120148
// For TLS, check if the socket has a name that's used to authenticate
121149
x509_name = tls ? SecureSockets.getPrincipalCN(((SSLSocket) socket).getSession().getLocalPrincipal()) : null;
122150

@@ -126,18 +154,7 @@ public ClientTCPHandler(final PVAClient client, final InetSocketAddress address,
126154
final long period = Math.max(1, PVASettings.EPICS_PVA_CONN_TMO * 1000L / 30 * 3);
127155
alive_check = timer.scheduleWithFixedDelay(this::checkResponsiveness, period, period, TimeUnit.MILLISECONDS);
128156

129-
// Start receiver, but not the send thread, yet.
130-
// To prevent sending messages before the server is ready,
131-
// it's started when server confirms the connection.
132-
startReceiver();
133-
}
134-
135-
private static Socket createSocket(final InetSocketAddress address, final boolean tls) throws Exception
136-
{
137-
final Socket socket = SecureSockets.createClientSocket(address, tls);
138-
socket.setTcpNoDelay(true);
139-
socket.setKeepAlive(true);
140-
return socket;
157+
return true;
141158
}
142159

143160
/** @return Client context */

core/pva/src/main/java/org/epics/pva/common/TCPHandler.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2023 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2025 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -16,7 +16,6 @@
1616
import java.net.SocketAddress;
1717
import java.nio.ByteBuffer;
1818
import java.nio.ByteOrder;
19-
import java.util.Objects;
2019
import java.util.concurrent.BlockingQueue;
2120
import java.util.concurrent.ExecutorService;
2221
import java.util.concurrent.Executors;
@@ -62,11 +61,16 @@ abstract public class TCPHandler
6261
private final boolean client_mode;
6362

6463
/** TCP socket to PVA peer
64+
*
65+
* Server got this client socket from `accept`.
66+
* Client needs to create the socket and connect to server's address.
6567
*
6668
* Reading and writing is handled by receive and send threads,
6769
* but 'protected' so that derived classes may peek at socket properties.
70+
*
71+
* @see {@link #initializeSocket()}
6872
*/
69-
protected final Socket socket;
73+
protected Socket socket = null;
7074

7175
/** Flag to indicate that 'close' was called to close the 'socket' */
7276
protected volatile boolean running = true;
@@ -117,13 +121,11 @@ public void encodeRequest(final byte version, final ByteBuffer buffer) throws Ex
117121
* but will only start sending them when the
118122
* send thread is running
119123
*
120-
* @param socket Socket to read/write
121124
* @param client_mode Is this the client, expecting to receive messages from server?
122125
* @see #startSender()
123126
*/
124-
public TCPHandler(final Socket socket, final boolean client_mode)
127+
public TCPHandler(final boolean client_mode)
125128
{
126-
this.socket = Objects.requireNonNull(socket);
127129
this.client_mode = client_mode;
128130

129131
// Receive buffer byte order is set based on header flag of each received message.
@@ -133,6 +135,15 @@ public TCPHandler(final Socket socket, final boolean client_mode)
133135
send_buffer.order(ByteOrder.nativeOrder());
134136
}
135137

138+
/** Initialize the {@link #socket}. Called by receiver.
139+
*
140+
* Server received socket from `accept` during construction and this may be a NOP.
141+
* Client will have to create socket and connect to server's address in here.
142+
*
143+
* @return Success?
144+
*/
145+
abstract protected boolean initializeSocket();
146+
136147
/** Start receiving data
137148
* To be called by Client/ServerTCPHandler when fully constructed
138149
*/
@@ -258,6 +269,18 @@ protected void send(final ByteBuffer buffer) throws Exception
258269
/** Receiver */
259270
private Void receiver()
260271
{
272+
// Establish connection
273+
Thread.currentThread().setName("TCP receiver");
274+
while (! initializeSocket())
275+
try
276+
{ // Delay for (another) connection timeout, at least 1 sec
277+
Thread.sleep(Math.max(1, PVASettings.EPICS_PVA_TCP_SOCKET_TMO) * 1000);
278+
}
279+
catch (Exception ignore)
280+
{
281+
// NOP
282+
}
283+
// Listen on the connection
261284
try
262285
{
263286
Thread.currentThread().setName("TCP receiver " + socket.getLocalSocketAddress());

core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2023 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2025 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -62,7 +62,9 @@ class ServerTCPHandler extends TCPHandler
6262

6363
public ServerTCPHandler(final PVAServer server, final Socket client, final TLSHandshakeInfo tls_info) throws Exception
6464
{
65-
super(client, false);
65+
super(false);
66+
// Server received the client socket from `accept`
67+
this.socket = Objects.requireNonNull(client);
6668
this.server = Objects.requireNonNull(server);
6769
this.tls_info = tls_info;
6870

@@ -113,6 +115,13 @@ public ServerTCPHandler(final PVAServer server, final Socket client, final TLSHa
113115
});
114116
}
115117

118+
@Override
119+
protected boolean initializeSocket()
120+
{
121+
// Nothing to do, received client socket on construction
122+
return true;
123+
}
124+
116125
PVAServer getServer()
117126
{
118127
return server;

0 commit comments

Comments
 (0)