1111
1212import java .net .InetSocketAddress ;
1313import java .nio .ByteBuffer ;
14+ import java .time .Instant ;
1415import java .util .ArrayList ;
16+ import java .util .LinkedList ;
1517import java .util .List ;
1618import java .util .Random ;
1719import java .util .concurrent .ConcurrentHashMap ;
2729import org .epics .pva .common .RequestEncoder ;
2830import org .epics .pva .common .SearchRequest ;
2931import org .epics .pva .data .Hexdump ;
32+ import org .epics .pva .data .PVAString ;
3033
3134/** Handler for search requests
3235 *
@@ -83,16 +86,6 @@ private class SearchedChannel
8386 * Steps up from 0 to MAX_SEARCH_PERIOD and then stays at MAX_SEARCH_PERIOD
8487 */
8588 final AtomicInteger search_period = new AtomicInteger (1 );
86-
87- /** Seconds spent in the current state.
88- * Incremented for every run of the search thread.
89- * If it reaches the current search_period,
90- * a search is performed and search_period updated
91- * to the next one.
92- */
93- final AtomicInteger seconds_in_state = new AtomicInteger (0 );
94-
95- final AtomicInteger tcp_search_counter = new AtomicInteger (0 );
9689 final PVAChannel channel ;
9790
9891 SearchedChannel (final PVAChannel channel )
@@ -105,15 +98,38 @@ private class SearchedChannel
10598 }
10699 }
107100
108- /** Search request sequence number */
109- private static final AtomicInteger search_sequence = new AtomicInteger ();
110-
111- private final ClientUDPHandler udp ;
101+ /** Map of searched channels by channel ID */
102+ // TODO Remove, just use search_buckets?
103+ private ConcurrentHashMap <Integer , SearchedChannel > searched_channels = new ConcurrentHashMap <>();
112104
113- private final Function <InetSocketAddress , ClientTCPHandler > tcp_provider ;
105+ /** Search buckets
106+ *
107+ * <p>The {@link #current_search_bucket} selects the list
108+ * of channels to be searched by {@link #runSeaches()},
109+ * which runs roughly once per second, each time moving to
110+ * the next search bucket in a ring buffer fashion.
111+ *
112+ * <p>Each searched channel is removed from the current bucket.
113+ * To be searched again, it is inserted into the appropriate
114+ * upcoming bucket, allowing for a maximum search
115+ * period of <code>MAX_SEARCH_PERIOD == search_buckets.size() - 2</code>.
116+ * The search bucket size is <code>MAX_SEARCH_PERIOD + 2</code>
117+ * so that a channel in bucket N can be moved to either
118+ * <code>N + MAX_SEARCH_PERIOD</code> or
119+ * <code>N + MAX_SEARCH_PERIOD + 1</code> to distribute searches
120+ * without putting the channel immediately back into bucket N
121+ * which would result in an endless loop.
122+ *
123+ * <p>Access to either {@link #search_buckets} or {@link #current_search_bucket}
124+ * must SYNC on {@link #search_buckets}.
125+ */
126+ private final ArrayList <LinkedList <SearchedChannel >> search_buckets = new ArrayList <>();
114127
115- /** Map of searched channels by channel ID */
116- private ConcurrentHashMap <Integer , SearchedChannel > searched_channels = new ConcurrentHashMap <>();
128+ /** Index of current search bucket, i.e. the one about to be searched.
129+ *
130+ * <p>Access must SYNC on {@link #search_buckets}.
131+ */
132+ private final AtomicInteger current_search_bucket = new AtomicInteger ();
117133
118134 /** Timer used to periodically check channels and issue search requests */
119135 private final ScheduledExecutorService timer = Executors .newSingleThreadScheduledExecutor (run ->
@@ -123,6 +139,10 @@ private class SearchedChannel
123139 return thread ;
124140 });
125141
142+ private final ClientUDPHandler udp ;
143+
144+ private final Function <InetSocketAddress , ClientTCPHandler > tcp_provider ;
145+
126146 /** Buffer for assembling search messages */
127147 private final ByteBuffer send_buffer = ByteBuffer .allocate (PVASettings .MAX_UDP_UNFRAGMENTED_SEND );
128148
@@ -139,6 +159,12 @@ public ChannelSearch(final ClientUDPHandler udp,
139159 this .udp = udp ;
140160 this .tcp_provider = tcp_provider ;
141161
162+ synchronized (search_buckets )
163+ {
164+ for (int i =0 ; i <MAX_SEARCH_PERIOD +2 ; ++i )
165+ search_buckets .add (new LinkedList <>());
166+ }
167+
142168 // Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server
143169 // on that multicast group or bcast subnet.
144170 // Searches sent to unicast addresses reach only the PVA server started _last_ on each host.
@@ -189,11 +215,17 @@ public void start()
189215 public void register (final PVAChannel channel , final boolean now )
190216 {
191217 logger .log (Level .FINE , () -> "Register search for " + channel .getName () + " " + channel .getCID ());
192- channel .setState (ClientChannelState .SEARCHING );
193- searched_channels .computeIfAbsent (channel .getCID (), id -> new SearchedChannel (channel ));
194- // Issue immediate search request?
195- if (now )
196- search (channel );
218+
219+ final ClientChannelState old = channel .setState (ClientChannelState .SEARCHING );
220+ if (old == ClientChannelState .SEARCHING )
221+ logger .log (Level .WARNING , "Registering channel " + channel + " to be searched more than once " );
222+
223+ final SearchedChannel sc = searched_channels .computeIfAbsent (channel .getCID (), id -> new SearchedChannel (channel ));
224+
225+ synchronized (search_buckets )
226+ {
227+ search_buckets .get (current_search_bucket .get ()).add (sc );
228+ }
197229 }
198230
199231 /** Stop searching for channel
@@ -206,6 +238,13 @@ public PVAChannel unregister(final int channel_id)
206238 if (searched != null )
207239 {
208240 logger .log (Level .FINE , () -> "Unregister search for " + searched .channel .getName () + " " + channel_id );
241+
242+ synchronized (search_buckets )
243+ {
244+ for (LinkedList <SearchedChannel > bucket : search_buckets )
245+ bucket .remove (searched );
246+ }
247+
209248 return searched .channel ;
210249 }
211250 return null ;
@@ -225,8 +264,13 @@ public void boost()
225264 : val );
226265 if (period == MIN_SEARCH_PERIOD )
227266 {
228- searched .seconds_in_state .set (0 );
229267 logger .log (Level .FINE , () -> "Restart search for '" + searched .channel .getName () + "'" );
268+ synchronized (search_buckets )
269+ {
270+ final LinkedList <SearchedChannel > bucket = search_buckets .get (current_search_bucket .get ());
271+ if (! bucket .contains (searched ))
272+ bucket .add (searched );
273+ }
230274 }
231275 // Not sending search right now:
232276 // search(channel);
@@ -237,27 +281,93 @@ public void boost()
237281 }
238282 }
239283
284+ /** List of channels to search, re-used within runSearches */
285+ private final ArrayList <PVAChannel > to_search = new ArrayList <>();
286+
240287 /** Invoked by timer: Check searched channels for the next one to handle */
241288 private void runSearches ()
242289 {
243- // TODO Collect searched channels, then issue one search message for all of them
244- // (several for UDP as we reach max packet size)
245- for (SearchedChannel searched : searched_channels .values ())
290+ to_search .clear ();
291+ synchronized (search_buckets )
246292 {
247- // Stayed long enough in current search period?
248- final int secs = searched .seconds_in_state .incrementAndGet ();
249- if (secs >= searched .search_period .get ())
293+ // Determine current search bucket
294+ final int current = current_search_bucket .getAndUpdate (i -> (i + 1 ) % search_buckets .size ());
295+ final LinkedList <SearchedChannel > bucket = search_buckets .get (current );
296+
297+ // Remove searched channels from the current bucket
298+ SearchedChannel sc ;
299+ while ((sc = bucket .poll ()) != null )
250300 {
251- logger .log (Level .FINE , () -> "Searching... " + searched .channel );
252- search (searched .channel );
253-
254- // Move to next search period, plateau at MAX_SEARCH_PERIOD
255- searched .seconds_in_state .set (0 );
256- searched .search_period .updateAndGet (p -> p < MAX_SEARCH_PERIOD
257- ? p + 1
258- : MAX_SEARCH_PERIOD );
301+ if (sc .channel .getState () == ClientChannelState .SEARCHING &&
302+ searched_channels .containsKey (sc .channel .getCID ()))
303+ {
304+ // Collect channels in 'to_search' for handling outside of sync. section
305+ to_search .add (sc .channel );
306+
307+ // Determine next search period
308+ final int period = sc .search_period .updateAndGet (sec -> sec < MAX_SEARCH_PERIOD
309+ ? sec + 1
310+ : MAX_SEARCH_PERIOD );
311+
312+ // Add to corresponding search bucket, or delay by one second
313+ // in case that search bucket is quite full
314+ final int i_n = (current + period ) % search_buckets .size ();
315+ final int i_n_n = (i_n + 1 ) % search_buckets .size ();
316+ final LinkedList <SearchedChannel > next = search_buckets .get (i_n );
317+ final LinkedList <SearchedChannel > next_next = search_buckets .get (i_n_n );
318+ if (i_n == current || i_n_n == current )
319+ throw new IllegalStateException ("Current, next and nextnext search indices for " + sc .channel + " are " +
320+ current + ", " + i_n + ", " + i_n_n );
321+ if (next_next .size () < next .size ())
322+ next_next .add (sc );
323+ else
324+ next .add (sc );
325+ }
326+ else
327+ logger .log (Level .FINE , "Dropping channel from search: " + sc .channel );
259328 }
260329 }
330+
331+
332+ // Search batch..
333+ // Size of a search request is close to 50 bytes
334+ // plus { int cid, string name } for each channel.
335+ // Channel count is unsigned short, but we limit
336+ // is to a signed short.
337+ // Similar to PVXS, further limit payload to 1400 bytes
338+ // to stay well below the ~1500 byte ethernet frame
339+ int start = 0 ;
340+ while (start < to_search .size ())
341+ {
342+ int payload = 0 ;
343+ int count = 0 ;
344+ while (start + count < to_search .size () && count < Short .MAX_VALUE -1 )
345+ {
346+ final PVAChannel channel = to_search .get (start + count );
347+ int size = 4 + PVAString .getEncodedSize (channel .getName ());
348+ if (payload + size < 50 ) // TODO 1400
349+ {
350+ ++count ;
351+ payload += size ;
352+ }
353+ else if (count == 0 )
354+ {
355+ logger .log (Level .WARNING , "PV name exceeds search buffer size: " + channel );
356+ searched_channels .remove (channel .getCID ());
357+ to_search .remove (start + count );
358+ }
359+ else
360+ break ;
361+ }
362+ if (count == 0 )
363+ break ;
364+
365+ final List <PVAChannel > batch = to_search .subList (start , start + count );
366+ System .out .println (Instant .now () + " Search bucket " + current_search_bucket .get () + " " + batch );
367+ for (PVAChannel channel : batch )
368+ search (channel );
369+ start += count ;
370+ }
261371 }
262372
263373 /** Issue a PVA server list request */
@@ -284,10 +394,6 @@ private void search(final PVAChannel channel)
284394 final SearchedChannel searched = searched_channels .get (channel .getCID ());
285395 if (searched == null )
286396 continue ;
287- // Compared to UDP, search only every other cycle
288- int count = searched .tcp_search_counter .incrementAndGet ();
289- if (count % 2 != 0 )
290- continue ;
291397
292398 final ClientTCPHandler tcp = tcp_provider .apply (name_server .getAddress ());
293399
@@ -297,7 +403,7 @@ private void search(final PVAChannel channel)
297403 {
298404 final RequestEncoder search_request = (version , buffer ) ->
299405 {
300- logger .log (Level .FINE , () -> "Searching for " + channel + " via TCP " + tcp .getRemoteAddress () + ", take " + count / 2 );
406+ logger .log (Level .FINE , () -> "Searching for " + channel + " via TCP " + tcp .getRemoteAddress ());
301407
302408 // Search sequence identifies the potentially repeated UDP.
303409 // TCP search is once only, so PVXS always sends 0x66696E64 = "find".
@@ -322,7 +428,9 @@ private void search(final PVAChannel channel)
322428 // Lock the send buffer to avoid concurrent use.
323429 synchronized (send_buffer )
324430 {
325- final int seq = search_sequence .incrementAndGet ();
431+ // For UDP, use bucket index to get a changing number that helps
432+ // match up duplicate packets and allows debugging bucket usage
433+ final int seq = current_search_bucket .get ();
326434 logger .log (Level .FINE , "UDP Search Request #" + seq + " for " + channel );
327435 sendSearch (seq , channel .getCID (), channel .getName ());
328436 }
0 commit comments