Skip to content

Commit c8bfbc5

Browse files
committed
PVA client: Search periods as used by PVXS
Search periods of 1, 2, 3, ..., 30 take about 7 minutes to plateau, which allows ignoring the initial ~3 min beacons from long running servers to avoid erroneous search boosts.
1 parent cb5b343 commit c8bfbc5

File tree

1 file changed

+57
-52
lines changed

1 file changed

+57
-52
lines changed

core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,18 @@
3232
*
3333
* <p>Maintains thread that periodically issues search requests
3434
* for registered channels.
35-
*
36-
* <p>Details of search timing are based on
37-
* https://github.com/epics-base/epicsCoreJava/blob/master/pvAccessJava/src/org/epics/pvaccess/client/impl/remote/search/SimpleChannelSearchManagerImpl.java
35+
* Search periods match the design of https://github.com/mdavidsaver/pvxs,
36+
* repeating searches for missing channels after 1, 2, 3, ..., 30 seconds
37+
* and then continuing every 30 seconds.
38+
* The exact period is not a multiple of 1000ms but 1000+-25ms to randomly
39+
* distribute searches from different clients.
40+
* Once the search plateaus at 30 seconds, which takes about 7.6 to 7.9 minutes,
41+
* the search can be "boosted" back to 1, 2, 3, ... seconds.
42+
* Since long running servers issue beacons every ~3 minutes,
43+
* every existing PVA server on the network will appear "new" when a client
44+
* receives its first beacon within ~3 minutes after startup.
45+
* Such beacons are ignored for the ~7 minutes where the search period settles
46+
* to avoid unnecessary network traffic.
3847
*
3948
* <p>Can send search requests to unicast (IPv4 and IPv6), multicast (4 & 6), broadcast (IPv4 only).
4049
* Since only StandardProtocolFamily.INET sockets support IPv4 multicast,
@@ -55,19 +64,39 @@
5564
@SuppressWarnings("nls")
5665
class ChannelSearch
5766
{
67+
/** Basic search period is one second */
68+
private static final int SEARCH_PERIOD_MS = 1000;
69+
70+
/** Search period jitter to avoid multiple clients all searching at the same period */
71+
private static final int SEARCH_JITTER_MS = 25;
72+
73+
/** Minimum search for a channel is ASAP, then incrementing by 1 */
74+
private static final int MIN_SEARCH_PERIOD = 0;
75+
76+
/** Maximum and eternal search period is every 30 sec */
77+
private static final int MAX_SEARCH_PERIOD = 30;
78+
5879
/** Channel that's being searched */
5980
private class SearchedChannel
6081
{
61-
final AtomicInteger search_counter;
82+
/** Search period in seconds.
83+
* Steps up from 0 to MAX_SEARCH_PERIOD and then stays at MAX_SEARCH_PERIOD
84+
*/
85+
final AtomicInteger search_period = new AtomicInteger(1);
86+
87+
/** Seconds spent in the current state.
88+
* Incremented for every run of the search thread.
89+
* If it reaches the current search_period,
90+
* a search is performed and search_period updated
91+
* to the next one.
92+
*/
93+
final AtomicInteger seconds_in_state = new AtomicInteger(0);
94+
6295
final AtomicInteger tcp_search_counter = new AtomicInteger(0);
6396
final PVAChannel channel;
6497

6598
SearchedChannel(final PVAChannel channel)
6699
{
67-
// Counter of 0 means the next regular search will increment
68-
// to 1 (no search), then 2 (power of two -> search).
69-
// So it'll "soon" perform a regular search.
70-
this.search_counter = new AtomicInteger(0);
71100
this.channel = channel;
72101
// Not starting an _immediate_ search in here because
73102
// this needs to be added to searched_channels first.
@@ -83,38 +112,6 @@ private class SearchedChannel
83112

84113
private final Function<InetSocketAddress, ClientTCPHandler> tcp_provider;
85114

86-
/** Basic search period */
87-
private static final int SEARCH_PERIOD_MS = 225;
88-
89-
/** Search period jitter to avoid multiple clients all searching at the same period */
90-
private static final int SEARCH_JITTER_MS = 25;
91-
92-
/** Exponential search intervals
93-
*
94-
* <p>Search counter for a channel is incremented each SEARCH_PERIOD_MS.
95-
* When counter is a power of 2, search request is sent.
96-
* Counter starts at 1, and first search period increments to 2:
97-
* 0 ms increments to 2 -> Search!
98-
* 225 ms increments to 3 -> No search
99-
* 450 ms increments to 4 -> Search (~0.5 sec after last)
100-
* 675 ms increments to 5 -> No search
101-
* 900 ms increments to 6 -> No search
102-
* 1125 ms increments to 7 -> No search
103-
* 1350 ms increments to 8 -> Search (~ 1 sec after last)
104-
* ...
105-
*
106-
* <p>So the time between searches is roughly 0.5 seconds,
107-
* 1 second, 2, 4, 8, 15, 30 seconds.
108-
*
109-
* <p>Once the search count reaches 256, it's reset to 129.
110-
* This means it then takes 128 periods to again reach 256
111-
* for the next search, so searches end up being issued
112-
* roughly every 128*0.225 = 30 seconds.
113-
*/
114-
private static final int BOOST_SEARCH_COUNT = 1,
115-
MAX_SEARCH_COUNT = 256,
116-
MAX_SEARCH_RESET = 129;
117-
118115
/** Map of searched channels by channel ID */
119116
private ConcurrentHashMap<Integer, SearchedChannel> searched_channels = new ConcurrentHashMap<>();
120117

@@ -223,10 +220,14 @@ public void boost()
223220
for (SearchedChannel searched : searched_channels.values())
224221
{
225222
// If search for channel has settled to the long period, restart
226-
final int count = searched.search_counter.updateAndGet(val -> val >= MAX_SEARCH_RESET ? BOOST_SEARCH_COUNT : val);
227-
if (count == BOOST_SEARCH_COUNT)
223+
final int period = searched.search_period.updateAndGet(val -> val >= MAX_SEARCH_PERIOD
224+
? MIN_SEARCH_PERIOD
225+
: val);
226+
if (period == MIN_SEARCH_PERIOD)
227+
{
228+
searched.seconds_in_state.set(0);
228229
logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'");
229-
230+
}
230231
// Not sending search right now:
231232
// search(channel);
232233
// Instead, scheduling it to be searched again real soon for a few times.
@@ -236,21 +237,25 @@ public void boost()
236237
}
237238
}
238239

239-
private static boolean isPowerOfTwo(final int x)
240-
{
241-
return x > 0 && (x & (x - 1)) == 0;
242-
}
243-
244240
/** Invoked by timer: Check searched channels for the next one to handle */
245241
private void runSearches()
246242
{
243+
// TODO Collect searched channels, then issue one search message for all of them
244+
// (several for UDP as we reach max packet size)
247245
for (SearchedChannel searched : searched_channels.values())
248246
{
249-
final int counter = searched.search_counter.updateAndGet(val -> val >= MAX_SEARCH_COUNT ? MAX_SEARCH_RESET : val+1);
250-
if (isPowerOfTwo(counter))
247+
// Stayed long enough in current search period?
248+
final int secs = searched.seconds_in_state.incrementAndGet();
249+
if (secs >= searched.search_period.get())
251250
{
252-
logger.log(Level.FINER, () -> "Searching... " + searched.channel);
251+
logger.log(Level.FINE, () -> "Searching... " + searched.channel);
253252
search(searched.channel);
253+
254+
// Move to next search period, plateau at MAX_SEARCH_PERIOD
255+
searched.seconds_in_state.set(0);
256+
searched.search_period.updateAndGet(p -> p < MAX_SEARCH_PERIOD
257+
? p + 1
258+
: MAX_SEARCH_PERIOD);
254259
}
255260
}
256261
}
@@ -318,7 +323,7 @@ private void search(final PVAChannel channel)
318323
synchronized (send_buffer)
319324
{
320325
final int seq = search_sequence.incrementAndGet();
321-
logger.log(Level.FINE, "Search Request #" + seq + " for " + channel);
326+
logger.log(Level.FINE, "UDP Search Request #" + seq + " for " + channel);
322327
sendSearch(seq, channel.getCID(), channel.getName());
323328
}
324329
}

0 commit comments

Comments
 (0)