Skip to content

Commit 0a49421

Browse files
Implement asynchronous record processing
Includes a "blocking" flag on record creation. This change allows you to use caput in asynchronous mode, where it will wait for record processing to complete.
1 parent d9ca6e4 commit 0a49421

File tree

5 files changed

+88
-5
lines changed

5 files changed

+88
-5
lines changed

softioc/device.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@
33
import ctypes
44
from ctypes import *
55
import numpy
6+
from threading import Event
67

78
from . import alarm
89
from . import fields
9-
from .imports import dbLoadDatabase, recGblResetAlarms, db_put_field
10+
from .imports import (
11+
create_callback_capsule,
12+
dbLoadDatabase,
13+
signal_processing_complete,
14+
recGblResetAlarms,
15+
db_put_field,
16+
)
1017
from .device_core import DeviceSupportCore, RecordLookup
1118

1219

@@ -19,6 +26,7 @@
1926
# Default False to maintain behaviour from previous versions.
2027
blocking = False
2128

29+
# TODO: Docs and Tests for the Blocking feature
2230
def set_blocking(val):
2331
global blocking
2432
blocking = val
@@ -152,6 +160,10 @@ def __init__(self, name, **kargs):
152160
self._value = None
153161

154162
self._blocking = kargs.pop('blocking', blocking)
163+
if self._blocking:
164+
self._completion_event = Event()
165+
self._callback = create_callback_capsule()
166+
155167

156168
self.__super.__init__(name, **kargs)
157169

@@ -174,15 +186,26 @@ def init_record(self, record):
174186
return self._epics_rc_
175187

176188
def __completion(self):
189+
if self._blocking:
190+
self._completion_event.set()
177191
pass
178192

193+
def __wait_for_completion(self, record):
194+
self._completion_event.wait()
195+
signal_processing_complete(record, self._callback)
196+
self._completion_event.clear()
197+
179198
def __wrap_completion(self, value):
180-
self.__on_update(value)
199+
dispatcher(self.__on_update, value)
181200
self.__completion()
182201

183202
def _process(self, record):
184203
'''Processing suitable for output records. Performs immediate value
185204
validation and asynchronous update notification.'''
205+
206+
if record.PACT:
207+
return EPICS_OK
208+
186209
value = self._read_value(record)
187210
if not self.__always_update and \
188211
self._compare_values(value, self._value):
@@ -201,7 +224,12 @@ def _process(self, record):
201224
self._value = value
202225
record.UDF = 0
203226
if self.__on_update and self.__enable_write:
227+
record.PACT = self._blocking
204228
dispatcher(self.__wrap_completion, python_value)
229+
230+
if self._blocking:
231+
# Create a process to wait for on_update to finish
232+
dispatcher(self.__wait_for_completion, record)
205233
return EPICS_OK
206234

207235

softioc/extension.c

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
1-
1+
/* Provide EPICS functions in Python format */
22
#define PY_SSIZE_T_CLEAN
33
#include <Python.h>
44
#include <string.h>
55

66
#define db_accessHFORdb_accessC // Needed to get correct DBF_ values
77
#include <dbAccess.h>
88
#include <dbFldTypes.h>
9+
#include <callback.h>
910
#include <dbStaticLib.h>
1011
#include <asTrapWrite.h>
1112
#include <epicsVersion.h>
1213
#include <dbChannel.h>
1314
#include <asTrapWrite.h>
1415
#include <asDbLib.h>
1516

17+
1618
/* Reference stealing version of PyDict_SetItemString */
1719
static void set_dict_item_steal(
1820
PyObject *dict, const char *name, PyObject *py_value)
@@ -209,6 +211,42 @@ static PyObject *install_pv_logging(PyObject *self, PyObject *args)
209211
Py_RETURN_NONE;
210212
}
211213

214+
#define CAPSULE_NAME "ProcessDeviceSupportOut.callback"
215+
216+
static void capsule_destructor(PyObject *obj)
217+
{
218+
void *callback = PyCapsule_GetPointer(obj, CAPSULE_NAME);
219+
free(callback);
220+
}
221+
222+
223+
static PyObject *create_callback_capsule(PyObject *self, PyObject *args)
224+
{
225+
void *callback = malloc(sizeof(CALLBACK));
226+
227+
printf("Created CALLBACK struct %p\n", callback);
228+
229+
return PyCapsule_New(callback, CAPSULE_NAME, &capsule_destructor);
230+
}
231+
232+
static PyObject *signal_processing_complete(PyObject *self, PyObject *args)
233+
{
234+
int priority;
235+
dbCommon *record;
236+
PyObject *callback_capsule;
237+
238+
if (!PyArg_ParseTuple(args, "inO", &priority, &record, &callback_capsule))
239+
{
240+
return NULL;
241+
}
242+
243+
CALLBACK *callback = PyCapsule_GetPointer(callback_capsule, CAPSULE_NAME);
244+
245+
callbackRequestProcessCallback(callback, priority, record);
246+
247+
Py_RETURN_NONE;
248+
}
249+
212250
static struct PyMethodDef softioc_methods[] = {
213251
{"get_DBF_values", get_DBF_values, METH_VARARGS,
214252
"Get a map of DBF names to values"},
@@ -218,6 +256,10 @@ static struct PyMethodDef softioc_methods[] = {
218256
"Put a database field to a value"},
219257
{"install_pv_logging", install_pv_logging, METH_VARARGS,
220258
"Install caput logging to stdout"},
259+
{"signal_processing_complete", signal_processing_complete, METH_VARARGS,
260+
"Inform EPICS that asynchronous record processing has completed"},
261+
{"create_callback_capsule", create_callback_capsule, METH_VARARGS,
262+
"Create a CALLBACK structure inside a PyCapsule"},
221263
{NULL, NULL, 0, NULL} /* Sentinel */
222264
};
223265

softioc/imports.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ def install_pv_logging(acf_file):
2727
'''Install pv logging'''
2828
_extension.install_pv_logging(acf_file)
2929

30+
def create_callback_capsule():
31+
return _extension.create_callback_capsule()
32+
33+
def signal_processing_complete(record, callback):
34+
'''Signal that asynchronous record processing has completed'''
35+
_extension.signal_processing_complete(
36+
record.PRIO,
37+
record.record.value,
38+
callback
39+
)
40+
3041
def expect_success(status, function, args):
3142
assert status == 0, 'Expected success'
3243

@@ -94,6 +105,8 @@ def from_param(cls, value):
94105

95106
__all__ = [
96107
'get_field_offsets',
108+
'create_callback_capsule',
109+
'signal_processing_complete',
97110
'registryDeviceSupportAdd',
98111
'IOSCANPVT', 'scanIoRequest', 'scanIoInit',
99112
'dbLoadDatabase',

softioc/pythonSoftIoc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def __init__(self, builder, device, name, **fields):
2424
# have to maintain this separately from the corresponding device list.
2525
DeviceKeywords = [
2626
'on_update', 'on_update_name', 'validate', 'always_update',
27-
'initial_value', '_wf_nelm', '_wf_dtype']
27+
'initial_value', '_wf_nelm', '_wf_dtype', 'blocking']
2828
device_kargs = {}
2929
for keyword in DeviceKeywords:
3030
if keyword in fields:

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def enable_code_coverage():
117117
def select_and_recv(conn, expected_char = None):
118118
"""Wait for the given Connection to have data to receive, and return it.
119119
If a character is provided check its correct before returning it."""
120-
# Must use cothread's select if cothread is prsent, otherwise we'd block
120+
# Must use cothread's select if cothread is present, otherwise we'd block
121121
# processing on all cothread processing. But we don't want to use it
122122
# unless we have to, as importing cothread can cause issues with forking.
123123
if "cothread" in sys.modules:

0 commit comments

Comments
 (0)