Skip to content

Commit 7da9967

Browse files
committed
PVA client: Send multiple name searches in one message
1 parent 87af88b commit 7da9967

File tree

6 files changed

+115
-82
lines changed

6 files changed

+115
-82
lines changed

core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212
import java.net.InetSocketAddress;
1313
import java.nio.ByteBuffer;
14-
import java.time.Instant;
1514
import java.util.ArrayList;
15+
import java.util.Collection;
1616
import java.util.LinkedList;
1717
import java.util.List;
1818
import java.util.Random;
@@ -217,7 +217,7 @@ public void start()
217217
*/
218218
public void register(final PVAChannel channel, final boolean now)
219219
{
220-
logger.log(Level.FINE, () -> "Register search for " + channel.getName() + " " + channel.getCID());
220+
logger.log(Level.FINE, () -> "Register search for " + channel);
221221

222222
final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING);
223223
if (old == ClientChannelState.SEARCHING)
@@ -288,6 +288,7 @@ public void boost()
288288
private final ArrayList<PVAChannel> to_search = new ArrayList<>();
289289

290290
/** Invoked by timer: Check searched channels for the next one to handle */
291+
@SuppressWarnings("unchecked")
291292
private void runSearches()
292293
{
293294
to_search.clear();
@@ -369,9 +370,10 @@ else if (count == 0)
369370
break;
370371

371372
final List<PVAChannel> batch = to_search.subList(start, start + count);
372-
System.out.println(Instant.now() + " Search bucket " + current_search_bucket.get() + " " + batch);
373-
for (PVAChannel channel : batch)
374-
search(channel);
373+
logger.log(Level.FINE, "Search bucket " + current_search_bucket.get() + " " + batch);
374+
375+
// PVAChannel extends SearchRequest.Channel, so use List<PVAChannel> as Collection<SR.Channel>
376+
search((Collection<SearchRequest.Channel>) (List<? extends SearchRequest.Channel>)batch);
375377
start += count;
376378
}
377379
}
@@ -385,22 +387,18 @@ public void list()
385387
synchronized (send_buffer)
386388
{
387389
logger.log(Level.FINE, "List Request");
388-
sendSearch(0, -1, null);
390+
sendSearch(0, null);
389391
}
390392
}
391393

392-
/** Issue search for channel
393-
* @param channel Channel to search
394+
/** Issue search for channels
395+
* @param channels Channels to search, <code>null</code> for 'list'
394396
*/
395-
private void search(final PVAChannel channel)
397+
private void search(final Collection<SearchRequest.Channel> channels)
396398
{
397399
// Search via TCP
398400
for (AddressInfo name_server : name_server_addresses)
399401
{
400-
final SearchedChannel searched = searched_channels.get(channel.getCID());
401-
if (searched == null)
402-
continue;
403-
404402
final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress());
405403

406404
// In case of connection errors (TCP connection blocked by firewall),
@@ -409,7 +407,7 @@ private void search(final PVAChannel channel)
409407
{
410408
final RequestEncoder search_request = (version, buffer) ->
411409
{
412-
logger.log(Level.FINE, () -> "Searching for " + channel + " via TCP " + tcp.getRemoteAddress());
410+
logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress());
413411

414412
// Search sequence identifies the potentially repeated UDP.
415413
// TCP search is once only, so PVXS always sends 0x66696E64 = "find".
@@ -419,7 +417,7 @@ private void search(final PVAChannel channel)
419417
// Use 'any' reply address since reply will be via this TCP socket
420418
final InetSocketAddress response_address = new InetSocketAddress(0);
421419

422-
SearchRequest.encode(true, seq, channel.getCID(), channel.getName(), response_address , buffer);
420+
SearchRequest.encode(true, seq, channels, response_address , buffer);
423421
};
424422
tcp.submit(search_request);
425423
}
@@ -437,20 +435,20 @@ private void search(final PVAChannel channel)
437435
// For UDP, use bucket index to get a changing number that helps
438436
// match up duplicate packets and allows debugging bucket usage
439437
final int seq = current_search_bucket.get();
440-
logger.log(Level.FINE, "UDP Search Request #" + seq + " for " + channel);
441-
sendSearch(seq, channel.getCID(), channel.getName());
438+
logger.log(Level.FINE, () -> "UDP Search Request #" + seq + " for " + channels);
439+
sendSearch(seq, channels);
442440
}
443441
}
444442

445443
/** Send a 'list' or channel search out via UDP */
446-
private void sendSearch(final int seq, final int cid, final String name)
444+
private void sendSearch(final int seq, final Collection<SearchRequest.Channel> channels)
447445
{
448446
// Buffer starts out with UNICAST bit set in the search message
449447
for (AddressInfo addr : unicast_search_addresses)
450448
{
451449
send_buffer.clear();
452450
final InetSocketAddress response = udp.getResponseAddress(addr);
453-
SearchRequest.encode(true, seq, cid, name, response, send_buffer);
451+
SearchRequest.encode(true, seq, channels, response, send_buffer);
454452
send_buffer.flip();
455453
try
456454
{
@@ -468,7 +466,7 @@ private void sendSearch(final int seq, final int cid, final String name)
468466
{
469467
send_buffer.clear();
470468
final InetSocketAddress response = udp.getResponseAddress(addr);
471-
SearchRequest.encode(false, seq, cid, name, response, send_buffer);
469+
SearchRequest.encode(false, seq, channels, response, send_buffer);
472470
send_buffer.flip();
473471
try
474472
{

core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -278,26 +278,25 @@ private boolean handleSearchRequest(final InetSocketAddress from, final byte ver
278278
{
279279
if (local_multicast != null && search != null && search.unicast)
280280
{
281-
if (search.name == null)
281+
if (search.channels == null)
282282
{
283283
if (search.reply_required)
284284
{
285285
forward_buffer.clear();
286-
SearchRequest.encode(false, 0, -1, null, search.client, forward_buffer);
286+
SearchRequest.encode(false, 0, null, search.client, forward_buffer);
287287
forward_buffer.flip();
288288
logger.log(Level.FINER, () -> "Forward search to list servers to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer));
289289
send(forward_buffer, local_multicast);
290290
}
291291
}
292292
else
293-
for (int i=0; i<search.name.length; ++i)
294-
{
295-
forward_buffer.clear();
296-
SearchRequest.encode(false, search.seq, search.cid[i], search.name[i], search.client, forward_buffer);
297-
forward_buffer.flip();
298-
logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer));
299-
send(forward_buffer, local_multicast);
300-
}
293+
{
294+
forward_buffer.clear();
295+
SearchRequest.encode(false, search.seq, search.channels, search.client, forward_buffer);
296+
forward_buffer.flip();
297+
logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer));
298+
send(forward_buffer, local_multicast);
299+
}
301300
}
302301
}
303302
catch (Exception ex)

core/pva/src/main/java/org/epics/pva/client/PVAChannel.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2021 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818
import java.util.logging.Level;
1919

20+
import org.epics.pva.common.SearchRequest;
2021
import org.epics.pva.data.PVADouble;
2122
import org.epics.pva.data.PVAString;
2223
import org.epics.pva.data.PVAStructure;
@@ -41,7 +42,7 @@
4142
* @author Kay Kasemir
4243
*/
4344
@SuppressWarnings("nls")
44-
public class PVAChannel implements AutoCloseable
45+
public class PVAChannel extends SearchRequest.Channel implements AutoCloseable
4546
{
4647
/** Provider for the 'next' client channel ID
4748
*
@@ -53,9 +54,7 @@ public class PVAChannel implements AutoCloseable
5354
private static final AtomicInteger CID_Provider = new AtomicInteger(1);
5455

5556
private final PVAClient client;
56-
private final String name;
5757
private final ClientChannelListener listener;
58-
private final int cid = CID_Provider.incrementAndGet();
5958
private volatile int sid = -1;
6059

6160
/** State
@@ -74,8 +73,8 @@ public class PVAChannel implements AutoCloseable
7473

7574
PVAChannel(final PVAClient client, final String name, final ClientChannelListener listener)
7675
{
76+
super(CID_Provider.incrementAndGet(), name);
7777
this.client = client;
78-
this.name = name;
7978
this.listener = listener;
8079
}
8180

@@ -92,25 +91,12 @@ ClientTCPHandler getTCP() throws Exception
9291
return copy;
9392
}
9493

95-
/** @return Client channel ID */
96-
int getCID()
97-
{
98-
return cid;
99-
}
100-
10194
/** @return Server channel ID */
10295
int getSID()
10396
{
10497
return sid;
10598
}
10699

107-
108-
/** @return Channel name */
109-
public String getName()
110-
{
111-
return name;
112-
}
113-
114100
/** @return {@link ClientChannelState} */
115101
public ClientChannelState getState()
116102
{

core/pva/src/main/java/org/epics/pva/common/SearchRequest.java

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2021 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -12,6 +12,9 @@
1212
import java.net.InetAddress;
1313
import java.net.InetSocketAddress;
1414
import java.nio.ByteBuffer;
15+
import java.util.ArrayList;
16+
import java.util.Collection;
17+
import java.util.List;
1518
import java.util.logging.Level;
1619

1720
import org.epics.pva.data.PVAAddress;
@@ -23,6 +26,43 @@
2326
@SuppressWarnings("nls")
2427
public class SearchRequest
2528
{
29+
/** Channel with CID to be searched */
30+
public static class Channel
31+
{
32+
/** Client ID */
33+
protected final int cid;
34+
35+
/** Channel name */
36+
protected final String name;
37+
38+
/** @param cid Client ID
39+
* @param name Channel name
40+
*/
41+
public Channel(final int cid, final String name)
42+
{
43+
this.cid = cid;
44+
this.name = name;
45+
}
46+
47+
/** @return Client channel ID */
48+
public int getCID()
49+
{
50+
return cid;
51+
}
52+
53+
/** @return Channel name */
54+
public String getName()
55+
{
56+
return name;
57+
}
58+
59+
@Override
60+
public String toString()
61+
{
62+
return "'" + name + "' [CID " + cid + "]";
63+
}
64+
};
65+
2666
/** Sequence number */
2767
public int seq;
2868
/** Is it a unicast? */
@@ -31,10 +71,8 @@ public class SearchRequest
3171
public boolean reply_required;
3272
/** Address of client */
3373
public InetSocketAddress client;
34-
/** Names requested in search */
35-
public String[] name;
36-
/** Client IDs for names */
37-
public int[] cid;
74+
/** Names requested in search, <code>null</code> for 'list' */
75+
public List<Channel> channels;
3876

3977
/** Check search request
4078
*
@@ -105,7 +143,7 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers
105143

106144
if (count == 0)
107145
{ // pvlist request
108-
search.name = null;
146+
search.channels = null;
109147
logger.log(Level.FINER, () -> "PVA Client " + from + " sent search #" + search.seq + " to list servers");
110148
}
111149
else
@@ -115,15 +153,13 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers
115153
logger.log(Level.WARNING, "PVA Client " + from + " sent search #" + search.seq + " for protocol '" + protocol + "', need 'tcp'");
116154
return null;
117155
}
118-
search.name = new String[count];
119-
search.cid = new int[count];
156+
search.channels = new ArrayList<>(count);
120157
for (int i=0; i<count; ++i)
121158
{
122159
final int cid = buffer.getInt();
123160
final String name = PVAString.decodeString(buffer);
124161
logger.log(Level.FINER, () -> "PVA Client " + from + " sent search #" + search.seq + " for " + name + " [cid " + cid + "]");
125-
search.name[i] = name;
126-
search.cid[i] = cid;
162+
search.channels.add(new Channel(cid, name));
127163
}
128164
}
129165

@@ -132,12 +168,11 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers
132168

133169
/** @param unicast Unicast?
134170
* @param seq Sequence number
135-
* @param cid Client ID
136-
* @param name PV name
171+
* @param channels Channels to search, <code>null</code> for 'list'
137172
* @param address client's address
138173
* @param buffer Buffer into which to encode
139174
*/
140-
public static void encode(final boolean unicast, final int seq, final int cid, final String name, final InetSocketAddress address, final ByteBuffer buffer)
175+
public static void encode(final boolean unicast, final int seq, final Collection<Channel> channels, final InetSocketAddress address, final ByteBuffer buffer)
141176
{
142177
// Create with zero payload size, to be patched later
143178
PVAHeader.encodeMessageHeader(buffer, PVAHeader.FLAG_NONE, PVAHeader.CMD_SEARCH, 0);
@@ -152,7 +187,7 @@ public static void encode(final boolean unicast, final int seq, final int cid, f
152187
// Mark search message as unicast so that receiver will forward
153188
// it via local broadcast to other local listeners.
154189
// 0-bit for replyRequired, 7-th bit for "sent as unicast" (1)/"sent as broadcast/multicast" (0)
155-
buffer.put((byte) ((unicast ? 0x80 : 0x00) | (cid < 0 ? 0x01 : 0x00)));
190+
buffer.put((byte) ((unicast ? 0x80 : 0x00) | (channels == null ? 0x01 : 0x00)));
156191

157192
// reserved
158193
buffer.put((byte) 0);
@@ -166,8 +201,9 @@ public static void encode(final boolean unicast, final int seq, final int cid, f
166201
buffer.putShort((short)address.getPort());
167202

168203
// string[] protocols with count as byte since < 254
169-
// struct { int searchInstanceID, string channelName } channels[] with count as short?!
170-
if (cid < 0)
204+
// struct { int searchInstanceID, string channelName } channels[] with count as short
205+
// No protocol and empty channels[] for 'list' aka 'discover' request
206+
if (channels == null)
171207
{
172208
buffer.put((byte)0);
173209
buffer.putShort((short)0);
@@ -177,9 +213,12 @@ public static void encode(final boolean unicast, final int seq, final int cid, f
177213
buffer.put((byte)1);
178214
PVAString.encodeString("tcp", buffer);
179215

180-
buffer.putShort((short)1);
181-
buffer.putInt(cid);
182-
PVAString.encodeString(name, buffer);
216+
buffer.putShort((short)channels.size());
217+
for (Channel channel : channels)
218+
{
219+
buffer.putInt(channel.cid);
220+
PVAString.encodeString(channel.name, buffer);
221+
}
183222
}
184223

185224
// Update payload size

core/pva/src/main/java/org/epics/pva/server/SearchCommandHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2021 Oak Ridge National Laboratory.
2+
* Copyright (c) 2021-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -32,9 +32,9 @@ public void handleCommand(final ServerTCPHandler tcp, final ByteBuffer buffer) t
3232

3333
final SearchRequest search = SearchRequest.decode(tcp.getRemoteAddress(), version, payload_size, buffer);
3434

35-
if (search.name != null)
36-
for (int i=0; i<search.name.length; ++i)
37-
tcp.getServer().handleSearchRequest(search.seq, search.cid[i], search.name[i],
35+
if (search.channels != null)
36+
for (SearchRequest.Channel channel : search.channels)
37+
tcp.getServer().handleSearchRequest(search.seq, channel.getCID(), channel.getName(),
3838
search.client, tcp);
3939
}
4040
}

0 commit comments

Comments
 (0)