Skip to content

Commit 49e516e

Browse files
authored
Merge pull request #3430 from ControlSystemStudio/pva_search_fix
PVA: Search cleanup
2 parents 8860639 + 6026a81 commit 49e516e

File tree

4 files changed

+93
-72
lines changed

4 files changed

+93
-72
lines changed

core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java

Lines changed: 69 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2023 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2025 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -13,10 +13,11 @@
1313
import java.nio.ByteBuffer;
1414
import java.util.ArrayList;
1515
import java.util.Collection;
16-
import java.util.LinkedList;
16+
import java.util.HashMap;
17+
import java.util.HashSet;
1718
import java.util.List;
1819
import java.util.Random;
19-
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.Set;
2021
import java.util.concurrent.Executors;
2122
import java.util.concurrent.ScheduledExecutorService;
2223
import java.util.concurrent.TimeUnit;
@@ -99,27 +100,47 @@ private class SearchedChannel
99100
// Otherwise run risk of getting reply without being able
100101
// to handle it
101102
}
103+
104+
// Hash by channel name
105+
@Override
106+
public int hashCode()
107+
{
108+
return channel.getName().hashCode();
109+
}
110+
111+
// Compare by channel name
112+
@Override
113+
public boolean equals(Object obj)
114+
{
115+
if (obj instanceof SearchedChannel other)
116+
return other.channel.getName().equals(channel.getName());
117+
return false;
118+
}
102119
}
103120

104121
// SearchedChannels are tracked in two data structures
105122
//
106-
// - searched_channels (concurrent)
123+
// - searched_channels
107124
// Fast lookup of channel by ID,
108-
// efficient `computeIfAbsent(cid, ..` mechanism for creating
109-
// at most one SearchedChannel per CID.
125+
// creating at most one SearchedChannel per CID.
110126
// Allows checking if a channel is indeed searched,
111127
// and locating the channel for a search reply.
112128
//
113-
// - search_buckets (need to SYNC)
129+
// - search_buckets
114130
// Efficiently schedule the search messages for all channels
115131
// up to MAX_SEARCH_PERIOD.
132+
//
133+
// Access to either one needs to be synchronized
116134

117-
/** Map of searched channels by channel ID */
118-
private ConcurrentHashMap<Integer, SearchedChannel> searched_channels = new ConcurrentHashMap<>();
135+
/** Map of searched channels by channel ID
136+
*
137+
* Access only from synchronized method
138+
*/
139+
private HashMap<Integer, SearchedChannel> searched_channels = new HashMap<>();
119140

120141
/** Search buckets
121142
*
122-
* <p>The {@link #current_search_bucket} selects the list
143+
* <p>The {@link #current_search_bucket} selects the set
123144
* of channels to be searched by {@link #runSearches()},
124145
* which runs roughly once per second, each time moving to
125146
* the next search bucket in a ring buffer fashion.
@@ -136,13 +157,13 @@ private class SearchedChannel
136157
* which would result in an endless loop.
137158
*
138159
* <p>Access to either {@link #search_buckets} or {@link #current_search_bucket}
139-
* must SYNC on {@link #search_buckets}.
160+
* must only occur in a 'synchronized' method.
140161
*/
141-
private final ArrayList<LinkedList<SearchedChannel>> search_buckets = new ArrayList<>();
162+
private final ArrayList<Set<SearchedChannel>> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2);
142163

143164
/** Index of current search bucket, i.e. the one about to be searched.
144165
*
145-
* <p>Access must SYNC on {@link #search_buckets}.
166+
* <p>Access must only occur in a 'synchronized' method.
146167
*/
147168
private final AtomicInteger current_search_bucket = new AtomicInteger();
148169

@@ -185,11 +206,9 @@ public ChannelSearch(final ClientUDPHandler udp,
185206
this.udp = udp;
186207
this.tcp_provider = tcp_provider;
187208

188-
synchronized (search_buckets)
189-
{
190-
for (int i=0; i<MAX_SEARCH_PERIOD+2; ++i)
191-
search_buckets.add(new LinkedList<>());
192-
}
209+
// Each bucket holds set of channels to search in that time slot
210+
for (int i=0; i<MAX_SEARCH_PERIOD+2; ++i)
211+
search_buckets.add(new HashSet<>());
193212

194213
// Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server
195214
// on that multicast group or bcast subnet.
@@ -227,7 +246,7 @@ else if (addr.isBroadcast())
227246

228247
public void start()
229248
{
230-
// +-jitter to prevent multiple clients from sending concurrent search requests
249+
// 1 second +-jitter to prevent multiple clients from sending concurrent search requests
231250
final long period = SEARCH_PERIOD_MS + (new Random().nextInt(2*SEARCH_JITTER_MS+1) - SEARCH_JITTER_MS);
232251

233252
logger.log(Level.FINER,
@@ -246,19 +265,20 @@ public void register(final PVAChannel channel, final boolean now)
246265
{
247266
logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon"));
248267

249-
final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING);
250-
if (old == ClientChannelState.SEARCHING)
251-
logger.log(Level.WARNING, "Registering channel " + channel + " to be searched more than once ");
268+
synchronized (this)
269+
{
270+
final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING);
271+
if (old == ClientChannelState.SEARCHING)
272+
logger.log(Level.WARNING, "Registering channel " + channel + " to be searched more than once ");
252273

253-
final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel));
274+
final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel));
254275

255-
synchronized (search_buckets)
256-
{
257276
int bucket = current_search_bucket.get();
258277
if (!now)
259-
bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size();
278+
bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size();
260279
search_buckets.get(bucket).add(sc);
261280
}
281+
262282
// Jumpstart search instead of waiting up to ~1 second for current bucket to be handled
263283
if (now)
264284
timer.execute(this::runSearches);
@@ -268,17 +288,15 @@ public void register(final PVAChannel channel, final boolean now)
268288
* @param channel_id
269289
* @return {@link PVAChannel}, <code>null</code> when channel wasn't searched any more
270290
*/
271-
public PVAChannel unregister(final int channel_id)
291+
public synchronized PVAChannel unregister(final int channel_id)
272292
{
273293
final SearchedChannel searched = searched_channels.remove(channel_id);
274294
if (searched != null)
275295
{
276296
logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id);
277-
// NOT removing `searched` from all `search_buckets`.
278-
// Removal would be a slow, linear operation.
279-
// `runSearches()` will drop the channel from `search_buckets`
280-
// because it's no longer listed in `searched_channels`
281-
297+
// Remove `searched` from all `search_buckets`.
298+
for (Set<SearchedChannel> bucket : search_buckets)
299+
bucket.remove(searched);
282300
return searched.channel;
283301
}
284302
return null;
@@ -288,7 +306,7 @@ public PVAChannel unregister(final int channel_id)
288306
*
289307
* <p>Resets their search counter so they're searched "real soon".
290308
*/
291-
public void boost()
309+
public synchronized void boost()
292310
{
293311
for (SearchedChannel searched : searched_channels.values())
294312
{
@@ -299,12 +317,9 @@ public void boost()
299317
if (period == MIN_SEARCH_PERIOD)
300318
{
301319
logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'");
302-
synchronized (search_buckets)
303-
{
304-
final LinkedList<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
305-
if (! bucket.contains(searched))
306-
bucket.add(searched);
307-
}
320+
321+
final Set<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
322+
bucket.add(searched);
308323
}
309324
// Not sending search right now:
310325
// search(channel);
@@ -322,17 +337,17 @@ public void boost()
322337
@SuppressWarnings("unchecked")
323338
private void runSearches()
324339
{
340+
// Determine current search bucket
341+
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
342+
// Collect channels to be searched while sync'ed
325343
to_search.clear();
326-
synchronized (search_buckets)
344+
synchronized (this)
327345
{
328-
// Determine current search bucket
329-
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
330-
final LinkedList<SearchedChannel> bucket = search_buckets.get(current);
346+
final Set<SearchedChannel> bucket = search_buckets.get(current);
331347
logger.log(Level.FINEST, () -> "Search bucket " + current);
332348

333349
// Remove searched channels from the current bucket
334-
SearchedChannel sc;
335-
while ((sc = bucket.poll()) != null)
350+
for (SearchedChannel sc : bucket)
336351
{
337352
if (sc.channel.getState() == ClientChannelState.SEARCHING &&
338353
searched_channels.containsKey(sc.channel.getCID()))
@@ -349,8 +364,8 @@ private void runSearches()
349364
// in case that search bucket is quite full
350365
final int i_n = (current + period) % search_buckets.size();
351366
final int i_n_n = (i_n + 1) % search_buckets.size();
352-
final LinkedList<SearchedChannel> next = search_buckets.get(i_n);
353-
final LinkedList<SearchedChannel> next_next = search_buckets.get(i_n_n);
367+
final Set<SearchedChannel> next = search_buckets.get(i_n);
368+
final Set<SearchedChannel> next_next = search_buckets.get(i_n_n);
354369
if (i_n == current || i_n_n == current)
355370
throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " +
356371
current + ", " + i_n + ", " + i_n_n);
@@ -362,9 +377,9 @@ private void runSearches()
362377
else
363378
logger.log(Level.FINE, "Dropping channel from search: " + sc.channel);
364379
}
380+
bucket.clear();
365381
}
366382

367-
368383
// Search batch..
369384
// Size of a search request is close to 50 bytes
370385
// plus { int cid, string name } for each channel.
@@ -439,8 +454,8 @@ private void search(final Collection<SearchRequest.Channel> channels)
439454
// This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas://
440455
final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS());
441456

442-
// In case of connection errors (TCP connection blocked by firewall),
443-
// tcp will be null
457+
// In older implementation, tcp was null in case of connection errors (TCP connection blocked by firewall).
458+
// No longer expected to happen but check anyway
444459
if (tcp != null)
445460
{
446461
final RequestEncoder search_request = (version, buffer) ->
@@ -526,8 +541,10 @@ private void sendSearch(final int seq, final Collection<SearchRequest.Channel> c
526541
/** Stop searching channels */
527542
public void close()
528543
{
529-
searched_channels.clear();
530-
544+
synchronized (this)
545+
{
546+
searched_channels.clear();
547+
}
531548
timer.shutdown();
532549
}
533550
}

core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ protected boolean initializeSocket()
140140
socket.setKeepAlive(true);
141141
}
142142
catch (Exception ex)
143-
{
143+
{
144144
logger.log(Level.WARNING, "PVA client cannot connect to " + server_address, ex);
145145
return false;
146146
}
@@ -157,6 +157,13 @@ protected boolean initializeSocket()
157157
return true;
158158
}
159159

160+
@Override
161+
public InetSocketAddress getRemoteAddress()
162+
{
163+
// socket may not be connected or null, return address to which we want to connect
164+
return new InetSocketAddress(server_address.getAddress(), server_address.getPort());
165+
}
166+
160167
/** @return Client context */
161168
PVAClient getClient()
162169
{

core/pva/src/main/java/org/epics/pva/common/TCPHandler.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ public TCPHandler(final boolean client_mode)
144144
*/
145145
abstract protected boolean initializeSocket();
146146

147+
/** @return Remote address of the TCP socket */
148+
abstract public InetSocketAddress getRemoteAddress();
149+
147150
/** Start receiving data
148151
* To be called by Client/ServerTCPHandler when fully constructed
149152
*/
@@ -167,12 +170,6 @@ protected void startSender() throws Exception
167170
throw new Exception("Send thread already running");
168171
}
169172

170-
/** @return Remote address of this end of the TCP socket */
171-
public InetSocketAddress getRemoteAddress()
172-
{
173-
return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
174-
}
175-
176173
/** @return Is the send queue idle/empty? */
177174
protected boolean isSendQueueIdle()
178175
{
@@ -269,20 +266,14 @@ protected void send(final ByteBuffer buffer) throws Exception
269266
/** Receiver */
270267
private Void receiver()
271268
{
272-
// Establish connection
273-
Thread.currentThread().setName("TCP receiver");
274-
while (! initializeSocket())
275-
try
276-
{ // Delay for (another) connection timeout, at least 1 sec
277-
Thread.sleep(Math.max(1, PVASettings.EPICS_PVA_TCP_SOCKET_TMO) * 1000);
278-
}
279-
catch (Exception ignore)
280-
{
281-
// NOP
282-
}
283-
// Listen on the connection
284269
try
285270
{
271+
// Establish connection
272+
Thread.currentThread().setName("TCP receiver");
273+
if (! initializeSocket())
274+
return null;
275+
276+
// Listen on the connection
286277
Thread.currentThread().setName("TCP receiver " + socket.getLocalSocketAddress());
287278
logger.log(Level.FINER, () -> Thread.currentThread().getName() + " started for " + socket.getRemoteSocketAddress());
288279
logger.log(Level.FINER, "Native byte order " + receive_buffer.order());
@@ -348,8 +339,8 @@ private Void receiver()
348339
}
349340
finally
350341
{
351-
onReceiverExited(running);
352342
logger.log(Level.FINER, Thread.currentThread().getName() + " done.");
343+
onReceiverExited(running);
353344
}
354345
return null;
355346
}

core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ protected boolean initializeSocket()
122122
return true;
123123
}
124124

125+
@Override
126+
public InetSocketAddress getRemoteAddress()
127+
{
128+
return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
129+
}
130+
125131
PVAServer getServer()
126132
{
127133
return server;

0 commit comments

Comments
 (0)