Skip to content

Commit 3b34cb6

Browse files
authored
Merge pull request #2437 from ControlSystemStudio/pva_put_complete
PVA write (put) vs. writeAsync (put-callback or put-completion)
2 parents f29d82c + 611aa92 commit 3b34cb6

File tree

9 files changed

+152
-36
lines changed

9 files changed

+152
-36
lines changed

core/pv/src/main/java/org/phoebus/pv/pva/PVA_PV.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019-2020 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -164,34 +164,39 @@ public VType get(final long timeout, final TimeUnit unit)
164164
@Override
165165
public void write(final Object new_value) throws Exception
166166
{
167-
// With Channel Access, there are different protocol options
168-
// for "put" vs. "put-callback".
169-
// With PVA, it is currently unclear how to distinguish between
170-
// these two. The PVA server might honor certain values in the
171-
// "request", but that is not documented as part of the protocol.
172-
// On one hand, we should 'get()' the result of the write to
173-
// receive exceptions for read-only PVs.
174-
// On the other hand, such a 'get()' could last a long time
175-
// in case some detail in the 'request' caused the PVA server
176-
// to perform a put-callback type of operation,
177-
// and a GUI calling write() expect an immediate return.
178-
179167
// Perform a disconnect check right now to alert caller
180168
// of clearly disconnected channel
181169
if (isDisconnected(read()))
182170
throw new IllegalStateException("Channel '" + getName() + "' is not connected");
183171

184172
// The channel could still disconnect in the middle of the write,
185-
// the channel may be read-only or experience other errors
186-
// that we'll only see as log messages since we don't want to
187-
// wait in 'get()' here...
188-
channel.write(name_helper.getWriteRequest(), new_value);
173+
// or experience other errors which we'll receive in the
174+
// response checked below.
175+
176+
// Perform a plain "put", not "put-callback"
177+
final Future<Void> response = channel.write(false, name_helper.getWriteRequest(), new_value);
178+
179+
// Compared to Channel Access, PVA currently offers no
180+
// information about writable vs. read-only channels.
181+
// A read-only channel will only inform us about the failed
182+
// write in the put response, for which we need to await
183+
// the return value from the PVA server.
184+
// Waiting for the response from the server, however,
185+
// can take a little time, enough to be noticeable in a GUI
186+
// that directly calls 'write' from the UI thread.
187+
// Still, there seems no alternative to waiting a little bit,
188+
// then potentially timing out.
189+
response.get(PVA_Preferences.epics_pva_write_reply_timeout_ms, TimeUnit.MILLISECONDS);
189190
}
190191

191192
@Override
192193
public Future<?> asyncWrite(final Object new_value) throws Exception
193194
{
194-
return channel.write(name_helper.getWriteRequest(), new_value);
195+
// Perform a put with completion,
196+
// i.e., process target and block until processing completes,
197+
// akin to a Channel Access put-callback.
198+
// Return the Future that can be used to await completion
199+
return channel.write(true, name_helper.getWriteRequest(), new_value);
195200
}
196201

197202
@Override

core/pv/src/main/java/org/phoebus/pv/pva/PVA_Preferences.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class PVA_Preferences
4343
{
4444
private static final PVA_Preferences instance = new PVA_Preferences();
4545

46+
public static int epics_pva_write_reply_timeout_ms;
47+
4648
/** Prevent direct instantiation */
4749
private PVA_Preferences()
4850
{
@@ -76,6 +78,8 @@ public void installPreferences() throws Exception
7678
new Object[] { propname, PVA_PVFactory.class.getPackageName(), setting, value });
7779
}
7880
}
81+
82+
epics_pva_write_reply_timeout_ms = prefs.getInt("epics_pva_write_reply_timeout_ms");
7983
}
8084

8185
/** @return Singleton instance */

core/pv/src/main/resources/pv_pva_preferences.properties

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,11 @@ epics_pva_max_array_formatting
4242

4343
# TCP buffer size for sending data
4444
epics_pva_send_buffer_size
45+
46+
# Timeout used by plain "put" type of write
47+
# when checking success or failure.
48+
# Note this is not used with asyncWrite,
49+
# the "put-callback" which returns a Future
50+
# for awaiting the completion,
51+
# but only with the plain "put" that returns ASAP
52+
epics_pva_write_reply_timeout_ms=1000

core/pva/src/main/java/org/epics/pva/client/FieldRequest.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -16,6 +16,7 @@
1616
import org.epics.pva.data.PVABool;
1717
import org.epics.pva.data.PVAData;
1818
import org.epics.pva.data.PVAInt;
19+
import org.epics.pva.data.PVAString;
1920
import org.epics.pva.data.PVAStructure;
2021

2122
/** Description of the 'field(..)' request used to get/monitor a channel
@@ -44,18 +45,18 @@ class FieldRequest
4445
{
4546
private final PVAStructure desc;
4647

47-
/** Parse field request
48+
/** Parse plain field request for "get"
4849
* @param request Examples:
4950
* "", "field()",
5051
* "value", "field(value)",
5152
* "field(value, timeStamp.userTag)"
5253
*/
5354
public FieldRequest(final String request)
5455
{
55-
this(0, request);
56+
this(0, false, request);
5657
}
5758

58-
/** Parse field request
59+
/** Parse field request for "monitor" with optional pipeline
5960
* @param pipeline Number of elements for 'pipeline' mode, 0 to disable
6061
* @param request Examples:
6162
* "", "field()",
@@ -64,19 +65,62 @@ public FieldRequest(final String request)
6465
*/
6566
public FieldRequest(final int pipeline, final String request)
6667
{
68+
this(pipeline, false, request);
69+
}
70+
71+
/** Parse field request for "put" with optional completion
72+
* @param completion Perform a write that triggers processing and only returns on completion?
73+
* @param request Examples:
74+
* "", "field()",
75+
* "value", "field(value)",
76+
* "field(value, timeStamp.userTag)"
77+
*/
78+
public FieldRequest(final boolean completion, final String request)
79+
{
80+
this(0, completion, request);
81+
}
82+
83+
84+
/** Parse field request
85+
* @param pipeline Number of elements for 'pipeline' mode, 0 to disable
86+
* @param completion Perform a write that triggers processing and only returns on completion?
87+
* @param request Examples:
88+
* "", "field()",
89+
* "value", "field(value)",
90+
* "field(value, timeStamp.userTag)"
91+
*/
92+
private FieldRequest(final int pipeline, final boolean completion, final String request)
93+
{
94+
if (pipeline > 0 && completion)
95+
throw new IllegalStateException("Cannot use both 'pipeline' (for get) " +
96+
"and 'completion' (for put) within same request");
6797
final List<PVAData> items = new ArrayList<>();
6898

6999
if (pipeline > 0)
70100
{
71101
// record._options.pipeline=true
72-
// 'camonitor' encodes as PVAString 'true', not PVABool
102+
// 'pvmonitor' encodes as PVAString 'true', not PVABool
73103
items.add(
74104
new PVAStructure("record", "",
75105
new PVAStructure("_options", "",
76106
new PVABool("pipeline", true),
77107
new PVAInt("queueSize", pipeline)
78108
)));
79109
}
110+
else if (completion)
111+
{
112+
// Similar to Channel Access put-callback:
113+
// Process passive record (could also use "true" to always process),
114+
// then block until processing completes
115+
// record._options.process="passive"
116+
// record._options.block=true
117+
items.add(
118+
new PVAStructure("record", "",
119+
new PVAStructure("_options", "",
120+
new PVAString("process", "passive"),
121+
new PVABool("block", true)
122+
)));
123+
}
80124

81125
// XXX Not using any client type registry,
82126
// but (re-)defining from 1 each time
@@ -100,7 +144,7 @@ public FieldRequest(final int pipeline, final String request)
100144
desc.setTypeID((short)1);
101145
}
102146

103-
/** @param "a, b.sub"
147+
/** @param field_spec "a, b.sub"
104148
* @return [ "a", "b.sub" ]
105149
*/
106150
private List<String> parseFields(final String field_spec)
@@ -172,7 +216,7 @@ public void encodeType(final ByteBuffer buffer) throws Exception
172216
desc.encodeType(buffer, described);
173217
}
174218

175-
/** Encode the request value (pipeline option)
219+
/** Encode the request value (with pipeline or block, process options)
176220
*
177221
* @param buffer {@link ByteBuffer}
178222
* @throws Exception on error

core/pva/src/main/java/org/epics/pva/client/PVAChannel.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,41 @@ public Future<PVAStructure> read(final String request)
241241
* @param request Request for element to write, e.g. "field(value)"
242242
* @param new_value New value: Number, String
243243
* @throws Exception on error
244-
* @return {@link Future} for awaiting completion
244+
* @return {@link Future} for awaiting completion and getting Exception in case of error
245+
* @deprecated Use {@link #write(boolean, String, Object)}
245246
*/
247+
@Deprecated
246248
public Future<Void> write(final String request, final Object new_value) throws Exception
247249
{
248-
return new PutRequest(this, request, new_value);
250+
return write(false, request, new_value);
251+
}
252+
253+
/** Write (put) an element of the channel's value on server
254+
*
255+
* <p>The request needs to address one field of the channel,
256+
* and the value to write must be accepted by that field.
257+
*
258+
* <p>For example, when "field(value)" addresses a double field,
259+
* {@link PVADouble#setValue(Object)} will be called, so <code>new_value</code>
260+
* may be a {@link Number}.
261+
*
262+
* <p>When "field(value)" addresses a text field,
263+
* {@link PVAString#setValue(Object)} will be called,
264+
* which accepts any object by converting it to a string.
265+
*
266+
* <p>When writing an enumerated field, its <code>int index</code>
267+
* will be written, requiring a {@link Number} that's then
268+
* used as an integer.
269+
*
270+
* @param completion Perform a processing "put" that blocks until completion, or write-and-forget?
271+
* @param request Request for element to write, e.g. "field(value)"
272+
* @param new_value New value: Number, String
273+
* @throws Exception on error
274+
* @return {@link Future} for awaiting completion and getting Exception in case of error
275+
*/
276+
public Future<Void> write(final boolean completion, final String request, final Object new_value) throws Exception
277+
{
278+
return new PutRequest(this, completion, request, new_value);
249279
}
250280

251281
/** Start a subscription

core/pva/src/main/java/org/epics/pva/client/PVAClientMain.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.List;
1313
import java.util.concurrent.CountDownLatch;
1414
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.TimeoutException;
1516
import java.util.logging.Handler;
1617
import java.util.logging.Level;
1718
import java.util.logging.LogManager;
@@ -32,6 +33,7 @@
3233
public class PVAClientMain
3334
{
3435
private static double seconds = 5.0;
36+
private static boolean completion = false;
3537
private static String request = "";
3638

3739
private static void help()
@@ -40,6 +42,7 @@ private static void help()
4042
System.out.println();
4143
System.out.println("Options:");
4244
System.out.println(" -h Help");
45+
System.out.println(" -c Perform a 'put-callback' that processes and blocks until completion?");
4346
System.out.println(" -w <seconds> Wait time, default is 5.0 seconds");
4447
System.out.println(" -r <fields> Field request. For 'info' command, optional field name");
4548
System.out.println(" Default 'value' for 'put', empty for other operations");
@@ -215,7 +218,14 @@ private static void put(final String name, final String value) throws Exception
215218
{
216219
new_value = value;
217220
}
218-
pv.write(request, new_value).get(timeout_ms, TimeUnit.MILLISECONDS);
221+
try
222+
{
223+
pv.write(completion, request, new_value).get(timeout_ms, TimeUnit.MILLISECONDS);
224+
}
225+
catch (TimeoutException ex)
226+
{
227+
System.err.println("Write timed out");
228+
}
219229
pv.close();
220230
}
221231
}
@@ -255,6 +265,8 @@ public static void main(final String[] args) throws Exception
255265
help();
256266
return;
257267
}
268+
else if (arg.startsWith("-c"))
269+
completion = true;
258270
else if (arg.startsWith("-w") && (i+1) < args.length)
259271
{
260272
seconds = Double.parseDouble(args[i+1]);
@@ -331,6 +343,5 @@ else if (command.equals("put") && names.size() == 2)
331343
}
332344
else
333345
help();
334-
335346
}
336347
}

core/pva/src/main/java/org/epics/pva/client/PutRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ class PutRequest extends CompletableFuture<Void> implements RequestEncoder, Resp
2626
{
2727
private final PVAChannel channel;
2828

29+
/** Perform a processing/blocking "put"? */
30+
private final boolean completion;
31+
2932
/** Request "field(value)" or "field(struct.sub.value)" */
3033
private final String request;
3134

@@ -43,14 +46,16 @@ class PutRequest extends CompletableFuture<Void> implements RequestEncoder, Resp
4346

4447
/** Request to write channel's value
4548
* @param channel {@link PVAChannel}
49+
* @param completion Perform a write that triggers processing and only returns on completion?
4650
* @param request Request for element to write, e.g. "field(value)"
4751
* @param new_value Value to write.
4852
* Must be accepted by {@link PVAData#setValue(Object)}
4953
* for the requested field.
5054
*/
51-
public PutRequest(final PVAChannel channel, final String request, final Object new_value)
55+
public PutRequest(final PVAChannel channel, final boolean completion, final String request, final Object new_value)
5256
{
5357
this.channel = channel;
58+
this.completion = completion;
5459
this.request = request;
5560
if (request.startsWith("field(") && request.endsWith(")"))
5661
request_path = request.substring(6, request.length()-1);
@@ -89,7 +94,7 @@ public void encodeRequest(final byte version, final ByteBuffer buffer) throws Ex
8994
buffer.putInt(request_id);
9095
buffer.put(PVAHeader.CMD_SUB_INIT);
9196

92-
final FieldRequest field_request = new FieldRequest(request);
97+
final FieldRequest field_request = new FieldRequest(completion, request);
9398
field_request.encodeType(buffer);
9499
field_request.encode(buffer);
95100
buffer.putInt(size_offset, buffer.position() - payload_start);

core/pva/src/test/java/org/epics/pva/client/TestFieldRequest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -129,6 +129,15 @@ public void testPipeline() throws Exception
129129
System.out.println(Hexdump.toHexdump(buffer));
130130
}
131131

132+
@Test
133+
public void testCompletion() throws Exception
134+
{
135+
FieldRequest request = new FieldRequest(true, "field(value)");
136+
System.out.println(request);
137+
ByteBuffer buffer = encode(request);
138+
System.out.println(Hexdump.toHexdump(buffer));
139+
}
140+
132141
private ByteBuffer encode(final FieldRequest request) throws Exception
133142
{
134143
final ByteBuffer buffer = ByteBuffer.allocate(100);

core/pva/src/test/java/org/epics/pva/combined/WriteDemo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2019 Oak Ridge National Laboratory.
2+
* Copyright (c) 2019-2022 Oak Ridge National Laboratory.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -96,11 +96,11 @@ public static void main(String[] args) throws Exception
9696
for (double v=5.0; v>=-1.0; --v)
9797
{
9898
TimeUnit.MILLISECONDS.sleep(100);
99-
ch1.write("", v);
99+
ch1.write(false, "", v);
100100
TimeUnit.MILLISECONDS.sleep(100);
101-
ch2.write("", v);
101+
ch2.write(false, "", v);
102102
TimeUnit.MILLISECONDS.sleep(100);
103-
ch3.write("", v);
103+
ch3.write(false, "", v);
104104
}
105105
System.out.println("Closing PVs");
106106
ch3.close();

0 commit comments

Comments
 (0)