Skip to content

Commit c1d8b05

Browse files
authored
Merge pull request #2284 from ControlSystemStudio/pva_beacons
PVA client search update
2 parents 55ce465 + 6bf8047 commit c1d8b05

File tree

8 files changed

+269
-122
lines changed

8 files changed

+269
-122
lines changed

core/pva/src/main/java/org/epics/pva/client/BeaconTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void removeOldBeaconInfo(final Instant now)
162162
{
163163
final BeaconInfo info = infos.next().getValue();
164164
final long age = Duration.between(info.last, now).getSeconds();
165-
if (age > 180) // TODO beacon_cleanup_period
165+
if (age > PVASettings.EPICS_PVA_MAX_BEACON_AGE)
166166
{
167167
logger.log(Level.FINER,
168168
() -> "Removing beacon info " + info.guid + " (" + info.address + "), last seen " + age + " seconds ago");

core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java

Lines changed: 169 additions & 59 deletions
Large diffs are not rendered by default.

core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -278,26 +278,25 @@ private boolean handleSearchRequest(final InetSocketAddress from, final byte ver
278278
{
279279
if (local_multicast != null && search != null && search.unicast)
280280
{
281-
if (search.name == null)
281+
if (search.channels == null)
282282
{
283283
if (search.reply_required)
284284
{
285285
forward_buffer.clear();
286-
SearchRequest.encode(false, 0, -1, null, search.client, forward_buffer);
286+
SearchRequest.encode(false, 0, null, search.client, forward_buffer);
287287
forward_buffer.flip();
288288
logger.log(Level.FINER, () -> "Forward search to list servers to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer));
289289
send(forward_buffer, local_multicast);
290290
}
291291
}
292292
else
293-
for (int i=0; i<search.name.length; ++i)
294-
{
295-
forward_buffer.clear();
296-
SearchRequest.encode(false, search.seq, search.cid[i], search.name[i], search.client, forward_buffer);
297-
forward_buffer.flip();
298-
logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer));
299-
send(forward_buffer, local_multicast);
300-
}
293+
{
294+
forward_buffer.clear();
295+
SearchRequest.encode(false, search.seq, search.channels, search.client, forward_buffer);
296+
forward_buffer.flip();
297+
logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer));
298+
send(forward_buffer, local_multicast);
299+
}
301300
}
302301
}
303302
catch (Exception ex)

core/pva/src/main/java/org/epics/pva/client/PVAChannel.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2021 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818
import java.util.logging.Level;
1919

20+
import org.epics.pva.common.SearchRequest;
2021
import org.epics.pva.data.PVADouble;
2122
import org.epics.pva.data.PVAString;
2223
import org.epics.pva.data.PVAStructure;
@@ -41,7 +42,7 @@
4142
* @author Kay Kasemir
4243
*/
4344
@SuppressWarnings("nls")
44-
public class PVAChannel implements AutoCloseable
45+
public class PVAChannel extends SearchRequest.Channel implements AutoCloseable
4546
{
4647
/** Provider for the 'next' client channel ID
4748
*
@@ -53,9 +54,7 @@ public class PVAChannel implements AutoCloseable
5354
private static final AtomicInteger CID_Provider = new AtomicInteger(1);
5455

5556
private final PVAClient client;
56-
private final String name;
5757
private final ClientChannelListener listener;
58-
private final int cid = CID_Provider.incrementAndGet();
5958
private volatile int sid = -1;
6059

6160
/** State
@@ -74,8 +73,8 @@ public class PVAChannel implements AutoCloseable
7473

7574
PVAChannel(final PVAClient client, final String name, final ClientChannelListener listener)
7675
{
76+
super(CID_Provider.incrementAndGet(), name);
7777
this.client = client;
78-
this.name = name;
7978
this.listener = listener;
8079
}
8180

@@ -92,25 +91,12 @@ ClientTCPHandler getTCP() throws Exception
9291
return copy;
9392
}
9493

95-
/** @return Client channel ID */
96-
int getCID()
97-
{
98-
return cid;
99-
}
100-
10194
/** @return Server channel ID */
10295
int getSID()
10396
{
10497
return sid;
10598
}
10699

107-
108-
/** @return Channel name */
109-
public String getName()
110-
{
111-
return name;
112-
}
113-
114100
/** @return {@link ClientChannelState} */
115101
public ClientChannelState getState()
116102
{

core/pva/src/main/java/org/epics/pva/common/SearchRequest.java

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2021 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -12,6 +12,9 @@
1212
import java.net.InetAddress;
1313
import java.net.InetSocketAddress;
1414
import java.nio.ByteBuffer;
15+
import java.util.ArrayList;
16+
import java.util.Collection;
17+
import java.util.List;
1518
import java.util.logging.Level;
1619

1720
import org.epics.pva.data.PVAAddress;
@@ -23,6 +26,43 @@
2326
@SuppressWarnings("nls")
2427
public class SearchRequest
2528
{
29+
/** Channel with CID to be searched */
30+
public static class Channel
31+
{
32+
/** Client ID */
33+
protected final int cid;
34+
35+
/** Channel name */
36+
protected final String name;
37+
38+
/** @param cid Client ID
39+
* @param name Channel name
40+
*/
41+
public Channel(final int cid, final String name)
42+
{
43+
this.cid = cid;
44+
this.name = name;
45+
}
46+
47+
/** @return Client channel ID */
48+
public int getCID()
49+
{
50+
return cid;
51+
}
52+
53+
/** @return Channel name */
54+
public String getName()
55+
{
56+
return name;
57+
}
58+
59+
@Override
60+
public String toString()
61+
{
62+
return "'" + name + "' [CID " + cid + "]";
63+
}
64+
};
65+
2666
/** Sequence number */
2767
public int seq;
2868
/** Is it a unicast? */
@@ -31,10 +71,8 @@ public class SearchRequest
3171
public boolean reply_required;
3272
/** Address of client */
3373
public InetSocketAddress client;
34-
/** Names requested in search */
35-
public String[] name;
36-
/** Client IDs for names */
37-
public int[] cid;
74+
/** Names requested in search, <code>null</code> for 'list' */
75+
public List<Channel> channels;
3876

3977
/** Check search request
4078
*
@@ -105,7 +143,7 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers
105143

106144
if (count == 0)
107145
{ // pvlist request
108-
search.name = null;
146+
search.channels = null;
109147
logger.log(Level.FINER, () -> "PVA Client " + from + " sent search #" + search.seq + " to list servers");
110148
}
111149
else
@@ -115,15 +153,13 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers
115153
logger.log(Level.WARNING, "PVA Client " + from + " sent search #" + search.seq + " for protocol '" + protocol + "', need 'tcp'");
116154
return null;
117155
}
118-
search.name = new String[count];
119-
search.cid = new int[count];
156+
search.channels = new ArrayList<>(count);
120157
for (int i=0; i<count; ++i)
121158
{
122159
final int cid = buffer.getInt();
123160
final String name = PVAString.decodeString(buffer);
124161
logger.log(Level.FINER, () -> "PVA Client " + from + " sent search #" + search.seq + " for " + name + " [cid " + cid + "]");
125-
search.name[i] = name;
126-
search.cid[i] = cid;
162+
search.channels.add(new Channel(cid, name));
127163
}
128164
}
129165

@@ -132,12 +168,11 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers
132168

133169
/** @param unicast Unicast?
134170
* @param seq Sequence number
135-
* @param cid Client ID
136-
* @param name PV name
171+
* @param channels Channels to search, <code>null</code> for 'list'
137172
* @param address client's address
138173
* @param buffer Buffer into which to encode
139174
*/
140-
public static void encode(final boolean unicast, final int seq, final int cid, final String name, final InetSocketAddress address, final ByteBuffer buffer)
175+
public static void encode(final boolean unicast, final int seq, final Collection<Channel> channels, final InetSocketAddress address, final ByteBuffer buffer)
141176
{
142177
// Create with zero payload size, to be patched later
143178
PVAHeader.encodeMessageHeader(buffer, PVAHeader.FLAG_NONE, PVAHeader.CMD_SEARCH, 0);
@@ -152,7 +187,7 @@ public static void encode(final boolean unicast, final int seq, final int cid, f
152187
// Mark search message as unicast so that receiver will forward
153188
// it via local broadcast to other local listeners.
154189
// 0-bit for replyRequired, 7-th bit for "sent as unicast" (1)/"sent as broadcast/multicast" (0)
155-
buffer.put((byte) ((unicast ? 0x80 : 0x00) | (cid < 0 ? 0x01 : 0x00)));
190+
buffer.put((byte) ((unicast ? 0x80 : 0x00) | (channels == null ? 0x01 : 0x00)));
156191

157192
// reserved
158193
buffer.put((byte) 0);
@@ -166,8 +201,9 @@ public static void encode(final boolean unicast, final int seq, final int cid, f
166201
buffer.putShort((short)address.getPort());
167202

168203
// string[] protocols with count as byte since < 254
169-
// struct { int searchInstanceID, string channelName } channels[] with count as short?!
170-
if (cid < 0)
204+
// struct { int searchInstanceID, string channelName } channels[] with count as short
205+
// No protocol and empty channels[] for 'list' aka 'discover' request
206+
if (channels == null)
171207
{
172208
buffer.put((byte)0);
173209
buffer.putShort((short)0);
@@ -177,9 +213,12 @@ public static void encode(final boolean unicast, final int seq, final int cid, f
177213
buffer.put((byte)1);
178214
PVAString.encodeString("tcp", buffer);
179215

180-
buffer.putShort((short)1);
181-
buffer.putInt(cid);
182-
PVAString.encodeString(name, buffer);
216+
buffer.putShort((short)channels.size());
217+
for (Channel channel : channels)
218+
{
219+
buffer.putInt(channel.cid);
220+
PVAString.encodeString(channel.name, buffer);
221+
}
183222
}
184223

185224
// Update payload size

core/pva/src/main/java/org/epics/pva/server/PVAServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
@SuppressWarnings("nls")
3636
public class PVAServer implements AutoCloseable
3737
{
38+
// TODO Implement beacons?
39+
3840
/** Common thread pool */
3941
public static ForkJoinPool POOL = ForkJoinPool.commonPool();
4042

core/pva/src/main/java/org/epics/pva/server/SearchCommandHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2021 Oak Ridge National Laboratory.
2+
* Copyright (c) 2021-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -32,9 +32,9 @@ public void handleCommand(final ServerTCPHandler tcp, final ByteBuffer buffer) t
3232

3333
final SearchRequest search = SearchRequest.decode(tcp.getRemoteAddress(), version, payload_size, buffer);
3434

35-
if (search.name != null)
36-
for (int i=0; i<search.name.length; ++i)
37-
tcp.getServer().handleSearchRequest(search.seq, search.cid[i], search.name[i],
35+
if (search.channels != null)
36+
for (SearchRequest.Channel channel : search.channels)
37+
tcp.getServer().handleSearchRequest(search.seq, channel.getCID(), channel.getName(),
3838
search.client, tcp);
3939
}
4040
}

core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import java.net.StandardProtocolFamily;
1616
import java.nio.ByteBuffer;
1717
import java.nio.channels.DatagramChannel;
18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
import java.util.List;
1821
import java.util.logging.Level;
1922

2023
import org.epics.pva.PVASettings;
@@ -176,24 +179,33 @@ private boolean handleSearch(final InetSocketAddress from, final byte version,
176179
if (search == null)
177180
return false;
178181

179-
if (search.name == null)
182+
if (search.channels == null)
180183
{
181184
if (search.reply_required)
182185
{ // pvlist request
183186
final boolean handled = server.handleSearchRequest(0, -1, null, search.client, null);
184187
if (! handled && search.unicast)
185-
PVAServer.POOL.submit(() -> forwardSearchRequest(0, -1, null, search.client));
188+
PVAServer.POOL.submit(() -> forwardSearchRequest(0, null, search.client));
186189
}
187190
}
188191
else
189192
{ // Channel search request
190-
for (int i=0; i<search.name.length; ++i)
193+
List<SearchRequest.Channel> forward = null;
194+
for (SearchRequest.Channel channel : search.channels)
191195
{
192-
final int cid = search.cid[i];
193-
final String name = search.name[i];
194-
final boolean handled = server.handleSearchRequest(search.seq, cid, name, search.client, null);
196+
final boolean handled = server.handleSearchRequest(search.seq, channel.getCID(), channel.getName(), search.client, null);
195197
if (! handled && search.unicast)
196-
PVAServer.POOL.submit(() -> forwardSearchRequest(search.seq, cid, name, search.client));
198+
{
199+
if (forward == null)
200+
forward = new ArrayList<>();
201+
forward.add(channel);
202+
}
203+
}
204+
205+
if (forward != null)
206+
{
207+
final List<SearchRequest.Channel> to_forward = forward;
208+
PVAServer.POOL.submit(() -> forwardSearchRequest(search.seq, to_forward, search.client));
197209
}
198210
}
199211

@@ -209,19 +221,18 @@ private boolean handleSearch(final InetSocketAddress from, final byte version,
209221
* allowing all servers on this host to reply.
210222
*
211223
* @param seq Search sequence or 0
212-
* @param cid Channel ID or -1
213-
* @param name Name or <code>null</code>
224+
* @param channels Channel CIDs and names or <code>null</code> for 'list'
214225
* @param address Client's address and port
215226
*/
216-
private void forwardSearchRequest(final int seq, final int cid, final String name, final InetSocketAddress address)
227+
private void forwardSearchRequest(final int seq, final Collection<SearchRequest.Channel> channels, final InetSocketAddress address)
217228
{
218229
// TODO Remove the local IPv4 multicast re-send from the protocol, just use multicast from the start as with IPv6
219230
if (local_multicast == null)
220231
return;
221232
synchronized (send_buffer)
222233
{
223234
send_buffer.clear();
224-
SearchRequest.encode(false, seq, cid, name, address, send_buffer);
235+
SearchRequest.encode(false, seq, channels, address, send_buffer);
225236
send_buffer.flip();
226237
logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(send_buffer));
227238
try

0 commit comments

Comments
 (0)