11/*******************************************************************************
2- * Copyright (c) 2019-2023 Oak Ridge National Laboratory.
2+ * Copyright (c) 2019-2025 Oak Ridge National Laboratory.
33 * All rights reserved. This program and the accompanying materials
44 * are made available under the terms of the Eclipse Public License v1.0
55 * which accompanies this distribution, and is available at
1313import java .nio .ByteBuffer ;
1414import java .util .ArrayList ;
1515import java .util .Collection ;
16- import java .util .LinkedList ;
16+ import java .util .HashMap ;
17+ import java .util .HashSet ;
1718import java .util .List ;
1819import java .util .Random ;
19- import java .util .concurrent . ConcurrentHashMap ;
20+ import java .util .Set ;
2021import java .util .concurrent .Executors ;
2122import java .util .concurrent .ScheduledExecutorService ;
2223import java .util .concurrent .TimeUnit ;
@@ -99,27 +100,47 @@ private class SearchedChannel
99100 // Otherwise run risk of getting reply without being able
100101 // to handle it
101102 }
103+
104+ // Hash by channel name
105+ @ Override
106+ public int hashCode ()
107+ {
108+ return channel .getName ().hashCode ();
109+ }
110+
111+ // Compare by channel name
112+ @ Override
113+ public boolean equals (Object obj )
114+ {
115+ if (obj instanceof SearchedChannel other )
116+ return other .channel .getName ().equals (channel .getName ());
117+ return false ;
118+ }
102119 }
103120
104121 // SearchedChannels are tracked in two data structures
105122 //
106- // - searched_channels (concurrent)
123+ // - searched_channels
107124 // Fast lookup of channel by ID,
108- // efficient `computeIfAbsent(cid, ..` mechanism for creating
109- // at most one SearchedChannel per CID.
125+ // creating at most one SearchedChannel per CID.
110126 // Allows checking if a channel is indeed searched,
111127 // and locating the channel for a search reply.
112128 //
113- // - search_buckets (need to SYNC)
129+ // - search_buckets
114130 // Efficiently schedule the search messages for all channels
115131 // up to MAX_SEARCH_PERIOD.
132+ //
133+ // Access to either one needs to be synchronized
116134
117- /** Map of searched channels by channel ID */
118- private ConcurrentHashMap <Integer , SearchedChannel > searched_channels = new ConcurrentHashMap <>();
135+ /** Map of searched channels by channel ID
136+ *
137+ * Access only from synchronized method
138+ */
139+ private HashMap <Integer , SearchedChannel > searched_channels = new HashMap <>();
119140
120141 /** Search buckets
121142 *
122- * <p>The {@link #current_search_bucket} selects the list
143+ * <p>The {@link #current_search_bucket} selects the set
123144 * of channels to be searched by {@link #runSearches()},
124145 * which runs roughly once per second, each time moving to
125146 * the next search bucket in a ring buffer fashion.
@@ -136,13 +157,13 @@ private class SearchedChannel
136157 * which would result in an endless loop.
137158 *
138159 * <p>Access to either {@link #search_buckets} or {@link #current_search_bucket}
139- * must SYNC on {@link #search_buckets} .
160+ * must only occur in a 'synchronized' method .
140161 */
141- private final ArrayList <LinkedList <SearchedChannel >> search_buckets = new ArrayList <>();
162+ private final ArrayList <Set <SearchedChannel >> search_buckets = new ArrayList <>(MAX_SEARCH_PERIOD + 2 );
142163
143164 /** Index of current search bucket, i.e. the one about to be searched.
144165 *
145- * <p>Access must SYNC on {@link #search_buckets} .
166+ * <p>Access must only occur in a 'synchronized' method .
146167 */
147168 private final AtomicInteger current_search_bucket = new AtomicInteger ();
148169
@@ -185,11 +206,9 @@ public ChannelSearch(final ClientUDPHandler udp,
185206 this .udp = udp ;
186207 this .tcp_provider = tcp_provider ;
187208
188- synchronized (search_buckets )
189- {
190- for (int i =0 ; i <MAX_SEARCH_PERIOD +2 ; ++i )
191- search_buckets .add (new LinkedList <>());
192- }
209+ // Each bucket holds set of channels to search in that time slot
210+ for (int i =0 ; i <MAX_SEARCH_PERIOD +2 ; ++i )
211+ search_buckets .add (new HashSet <>());
193212
194213 // Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server
195214 // on that multicast group or bcast subnet.
@@ -227,7 +246,7 @@ else if (addr.isBroadcast())
227246
228247 public void start ()
229248 {
230- // +-jitter to prevent multiple clients from sending concurrent search requests
249+ // 1 second +-jitter to prevent multiple clients from sending concurrent search requests
231250 final long period = SEARCH_PERIOD_MS + (new Random ().nextInt (2 *SEARCH_JITTER_MS +1 ) - SEARCH_JITTER_MS );
232251
233252 logger .log (Level .FINER ,
@@ -246,19 +265,20 @@ public void register(final PVAChannel channel, final boolean now)
246265 {
247266 logger .log (Level .FINE , () -> "Register search for " + channel + (now ? " now" : " soon" ));
248267
249- final ClientChannelState old = channel .setState (ClientChannelState .SEARCHING );
250- if (old == ClientChannelState .SEARCHING )
251- logger .log (Level .WARNING , "Registering channel " + channel + " to be searched more than once " );
268+ synchronized (this )
269+ {
270+ final ClientChannelState old = channel .setState (ClientChannelState .SEARCHING );
271+ if (old == ClientChannelState .SEARCHING )
272+ logger .log (Level .WARNING , "Registering channel " + channel + " to be searched more than once " );
252273
253- final SearchedChannel sc = searched_channels .computeIfAbsent (channel .getCID (), id -> new SearchedChannel (channel ));
274+ final SearchedChannel sc = searched_channels .computeIfAbsent (channel .getCID (), id -> new SearchedChannel (channel ));
254275
255- synchronized (search_buckets )
256- {
257276 int bucket = current_search_bucket .get ();
258277 if (!now )
259- bucket = (bucket + SEARCH_SOON_DELAY ) % search_buckets .size ();
260- search_buckets .get (bucket ).add (sc );
278+ bucket = (bucket + SEARCH_SOON_DELAY ) % search_buckets .size ();
279+ search_buckets .get (bucket ).add (sc );
261280 }
281+
262282 // Jumpstart search instead of waiting up to ~1 second for current bucket to be handled
263283 if (now )
264284 timer .execute (this ::runSearches );
@@ -268,17 +288,15 @@ public void register(final PVAChannel channel, final boolean now)
268288 * @param channel_id
269289 * @return {@link PVAChannel}, <code>null</code> when channel wasn't searched any more
270290 */
271- public PVAChannel unregister (final int channel_id )
291+ public synchronized PVAChannel unregister (final int channel_id )
272292 {
273293 final SearchedChannel searched = searched_channels .remove (channel_id );
274294 if (searched != null )
275295 {
276296 logger .log (Level .FINE , () -> "Unregister search for " + searched .channel .getName () + " " + channel_id );
277- // NOT removing `searched` from all `search_buckets`.
278- // Removal would be a slow, linear operation.
279- // `runSearches()` will drop the channel from `search_buckets`
280- // because it's no longer listed in `searched_channels`
281-
297+ // Remove `searched` from all `search_buckets`.
298+ for (Set <SearchedChannel > bucket : search_buckets )
299+ bucket .remove (searched );
282300 return searched .channel ;
283301 }
284302 return null ;
@@ -288,7 +306,7 @@ public PVAChannel unregister(final int channel_id)
288306 *
289307 * <p>Resets their search counter so they're searched "real soon".
290308 */
291- public void boost ()
309+ public synchronized void boost ()
292310 {
293311 for (SearchedChannel searched : searched_channels .values ())
294312 {
@@ -299,12 +317,9 @@ public void boost()
299317 if (period == MIN_SEARCH_PERIOD )
300318 {
301319 logger .log (Level .FINE , () -> "Restart search for '" + searched .channel .getName () + "'" );
302- synchronized (search_buckets )
303- {
304- final LinkedList <SearchedChannel > bucket = search_buckets .get (current_search_bucket .get ());
305- if (! bucket .contains (searched ))
306- bucket .add (searched );
307- }
320+
321+ final Set <SearchedChannel > bucket = search_buckets .get (current_search_bucket .get ());
322+ bucket .add (searched );
308323 }
309324 // Not sending search right now:
310325 // search(channel);
@@ -322,17 +337,17 @@ public void boost()
322337 @ SuppressWarnings ("unchecked" )
323338 private void runSearches ()
324339 {
340+ // Determine current search bucket
341+ final int current = current_search_bucket .getAndUpdate (i -> (i + 1 ) % search_buckets .size ());
342+ // Collect channels to be searched while sync'ed
325343 to_search .clear ();
326- synchronized (search_buckets )
344+ synchronized (this )
327345 {
328- // Determine current search bucket
329- final int current = current_search_bucket .getAndUpdate (i -> (i + 1 ) % search_buckets .size ());
330- final LinkedList <SearchedChannel > bucket = search_buckets .get (current );
346+ final Set <SearchedChannel > bucket = search_buckets .get (current );
331347 logger .log (Level .FINEST , () -> "Search bucket " + current );
332348
333349 // Remove searched channels from the current bucket
334- SearchedChannel sc ;
335- while ((sc = bucket .poll ()) != null )
350+ for (SearchedChannel sc : bucket )
336351 {
337352 if (sc .channel .getState () == ClientChannelState .SEARCHING &&
338353 searched_channels .containsKey (sc .channel .getCID ()))
@@ -349,8 +364,8 @@ private void runSearches()
349364 // in case that search bucket is quite full
350365 final int i_n = (current + period ) % search_buckets .size ();
351366 final int i_n_n = (i_n + 1 ) % search_buckets .size ();
352- final LinkedList <SearchedChannel > next = search_buckets .get (i_n );
353- final LinkedList <SearchedChannel > next_next = search_buckets .get (i_n_n );
367+ final Set <SearchedChannel > next = search_buckets .get (i_n );
368+ final Set <SearchedChannel > next_next = search_buckets .get (i_n_n );
354369 if (i_n == current || i_n_n == current )
355370 throw new IllegalStateException ("Current, next and nextnext search indices for " + sc .channel + " are " +
356371 current + ", " + i_n + ", " + i_n_n );
@@ -362,9 +377,9 @@ private void runSearches()
362377 else
363378 logger .log (Level .FINE , "Dropping channel from search: " + sc .channel );
364379 }
380+ bucket .clear ();
365381 }
366382
367-
368383 // Search batch..
369384 // Size of a search request is close to 50 bytes
370385 // plus { int cid, string name } for each channel.
@@ -441,6 +456,7 @@ private void search(final Collection<SearchRequest.Channel> channels)
441456
442457 // In case of connection errors (TCP connection blocked by firewall),
443458 // tcp will be null
459+ // TODO CHECK THAT with updated ClientTCPHandler that connects on receive thread
444460 if (tcp != null )
445461 {
446462 final RequestEncoder search_request = (version , buffer ) ->
@@ -526,8 +542,10 @@ private void sendSearch(final int seq, final Collection<SearchRequest.Channel> c
526542 /** Stop searching channels */
527543 public void close ()
528544 {
529- searched_channels .clear ();
530-
545+ synchronized (this )
546+ {
547+ searched_channels .clear ();
548+ }
531549 timer .shutdown ();
532550 }
533551}
0 commit comments