Skip to content

Commit 74480bb

Browse files
author
kasemir
committed
PVA: Handle client's connection to server in receive thread
1 parent 68c455d commit 74480bb

File tree

3 files changed

+48
-22
lines changed

3 files changed

+48
-22
lines changed

core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static org.epics.pva.PVASettings.logger;
1111

1212
import java.net.InetSocketAddress;
13-
import java.net.Socket;
1413
import java.nio.ByteBuffer;
1514
import java.util.Collection;
1615
import java.util.concurrent.ConcurrentHashMap;
@@ -56,6 +55,12 @@ class ClientTCPHandler extends TCPHandler
5655
new GetTypeHandler(),
5756
new RPCHandler());
5857

58+
/** Address of server to which this client will connect */
59+
private final InetSocketAddress server_address;
60+
61+
/** Is this a TLS connection or plain TCP? */
62+
private final boolean tls;
63+
5964
/** Client context */
6065
private final PVAClient client;
6166

@@ -112,32 +117,34 @@ class ClientTCPHandler extends TCPHandler
112117

113118
public ClientTCPHandler(final PVAClient client, final InetSocketAddress address, final Guid guid, final boolean tls) throws Exception
114119
{
115-
super(createSocket(address, tls), true);
120+
super(true);
116121
logger.log(Level.FINE, () -> "TCPHandler " + (tls ? "(TLS) " : "") + guid + " for " + address + " created ============================");
122+
this.server_address = address;
123+
this.tls = tls;
117124
this.client = client;
118125
this.guid = guid;
119126

120-
// For TLS, check if the socket has a name that's used to authenticate
121-
x509_name = tls ? SecureSockets.getPrincipalCN(((SSLSocket) socket).getSession().getLocalPrincipal()) : null;
122-
123-
// For default EPICS_CA_CONN_TMO: 30 sec, send echo at ~15 sec:
124-
// Check every ~3 seconds
125-
last_life_sign = last_message_sent = System.currentTimeMillis();
126-
final long period = Math.max(1, PVASettings.EPICS_PVA_CONN_TMO * 1000L / 30 * 3);
127-
alive_check = timer.scheduleWithFixedDelay(this::checkResponsiveness, period, period, TimeUnit.MILLISECONDS);
128-
129127
// Start receiver, but not the send thread, yet.
130128
// To prevent sending messages before the server is ready,
131129
// it's started when server confirms the connection.
132130
startReceiver();
133131
}
134132

135-
private static Socket createSocket(final InetSocketAddress address, final boolean tls) throws Exception
133+
@Override
134+
protected void initializeSocket() throws Exception
136135
{
137-
final Socket socket = SecureSockets.createClientSocket(address, tls);
136+
socket = SecureSockets.createClientSocket(server_address, tls);
138137
socket.setTcpNoDelay(true);
139138
socket.setKeepAlive(true);
140-
return socket;
139+
140+
// For TLS, check if the socket has a name that's used to authenticate
141+
x509_name = tls ? SecureSockets.getPrincipalCN(((SSLSocket) socket).getSession().getLocalPrincipal()) : null;
142+
143+
// For default EPICS_CA_CONN_TMO: 30 sec, send echo at ~15 sec:
144+
// Check every ~3 seconds
145+
last_life_sign = last_message_sent = System.currentTimeMillis();
146+
final long period = Math.max(1, PVASettings.EPICS_PVA_CONN_TMO * 1000L / 30 * 3);
147+
alive_check = timer.scheduleWithFixedDelay(this::checkResponsiveness, period, period, TimeUnit.MILLISECONDS);
141148
}
142149

143150
/** @return Client context */

core/pva/src/main/java/org/epics/pva/common/TCPHandler.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2023 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2025 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -16,7 +16,6 @@
1616
import java.net.SocketAddress;
1717
import java.nio.ByteBuffer;
1818
import java.nio.ByteOrder;
19-
import java.util.Objects;
2019
import java.util.concurrent.BlockingQueue;
2120
import java.util.concurrent.ExecutorService;
2221
import java.util.concurrent.Executors;
@@ -62,11 +61,16 @@ abstract public class TCPHandler
6261
private final boolean client_mode;
6362

6463
/** TCP socket to PVA peer
64+
*
65+
* Server got this client socket from `accept`.
66+
* Client needs to create the socket and connect to server's address.
6567
*
6668
* Reading and writing is handled by receive and send threads,
6769
* but 'protected' so that derived classes may peek at socket properties.
70+
*
71+
* @see {@link #initializeSocket()}
6872
*/
69-
protected final Socket socket;
73+
protected Socket socket = null;
7074

7175
/** Flag to indicate that 'close' was called to close the 'socket' */
7276
protected volatile boolean running = true;
@@ -117,13 +121,11 @@ public void encodeRequest(final byte version, final ByteBuffer buffer) throws Ex
117121
* but will only start sending them when the
118122
* send thread is running
119123
*
120-
* @param socket Socket to read/write
121124
* @param client_mode Is this the client, expecting to receive messages from server?
122125
* @see #startSender()
123126
*/
124-
public TCPHandler(final Socket socket, final boolean client_mode)
127+
public TCPHandler(final boolean client_mode)
125128
{
126-
this.socket = Objects.requireNonNull(socket);
127129
this.client_mode = client_mode;
128130

129131
// Receive buffer byte order is set based on header flag of each received message.
@@ -133,6 +135,13 @@ public TCPHandler(final Socket socket, final boolean client_mode)
133135
send_buffer.order(ByteOrder.nativeOrder());
134136
}
135137

138+
/** Initialize the {@link #socket}. Called by receiver.
139+
*
140+
* Server received socket from `accept` during construction and this may be a NOP.
141+
* Client will have to create socket and connect to server's address in here
142+
*/
143+
abstract protected void initializeSocket() throws Exception;
144+
136145
/** Start receiving data
137146
* To be called by Client/ServerTCPHandler when fully constructed
138147
*/
@@ -260,6 +269,8 @@ private Void receiver()
260269
{
261270
try
262271
{
272+
Thread.currentThread().setName("TCP receiver");
273+
initializeSocket();
263274
Thread.currentThread().setName("TCP receiver " + socket.getLocalSocketAddress());
264275
logger.log(Level.FINER, () -> Thread.currentThread().getName() + " started for " + socket.getRemoteSocketAddress());
265276
logger.log(Level.FINER, "Native byte order " + receive_buffer.order());

core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2023 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2025 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -62,7 +62,9 @@ class ServerTCPHandler extends TCPHandler
6262

6363
public ServerTCPHandler(final PVAServer server, final Socket client, final TLSHandshakeInfo tls_info) throws Exception
6464
{
65-
super(client, false);
65+
super(false);
66+
// Server received the client socket from `accept`
67+
this.socket = Objects.requireNonNull(client);
6668
this.server = Objects.requireNonNull(server);
6769
this.tls_info = tls_info;
6870

@@ -113,6 +115,12 @@ public ServerTCPHandler(final PVAServer server, final Socket client, final TLSHa
113115
});
114116
}
115117

118+
@Override
119+
protected void initializeSocket() throws Exception
120+
{
121+
// Nothing to do, received client socket on construction
122+
}
123+
116124
PVAServer getServer()
117125
{
118126
return server;

0 commit comments

Comments
 (0)