Skip to content

Commit bb322f0

Browse files
Refactor the check for async on_update calls
Also add single and multi threaded tests for blocking records.
1 parent 799f4c0 commit bb322f0

File tree

2 files changed

+234
-39
lines changed

2 files changed

+234
-39
lines changed

softioc/device.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from inspect import isawaitable
1+
from inspect import iscoroutinefunction
22
import os
33
import time
44
import ctypes
@@ -186,18 +186,16 @@ def init_record(self, record):
186186
return self._epics_rc_
187187

188188
def __completion(self, record):
189+
'''Signals that all on_update processing is finished'''
189190
if self._blocking:
190191
signal_processing_complete(record, self._callback)
191192
pass
192193

193194
def __wrap_completion(self, value, record):
194-
update = self.__on_update(value)
195-
if isawaitable(update):
196-
dispatcher(self._complete_update, update, record)
197-
else:
198-
self.__completion(record)
195+
self.__on_update(value)
196+
self.__completion(record)
199197

200-
async def _complete_update(self, future, record):
198+
async def __async_wrap_completion(self, future, record):
201199
await future
202200
self.__completion(record)
203201

@@ -227,7 +225,21 @@ def _process(self, record):
227225
record.UDF = 0
228226
if self.__on_update and self.__enable_write:
229227
record.PACT = self._blocking
230-
dispatcher(self.__wrap_completion, python_value, record)
228+
229+
if iscoroutinefunction(self.__on_update):
230+
# This is an unfortunate, but unavoidable, leak of
231+
# implementation detail that really should be kept within
232+
# the dispatcher, but cannot be. This is due to asyncio not
233+
# allowing its event loop to be nested, thus either
234+
# requiring an additional call to the dispatcher once you
235+
# acquire the Future from the coroutine, or doing this.
236+
dispatcher(
237+
self.__async_wrap_completion,
238+
self.__on_update(python_value),
239+
record
240+
)
241+
else:
242+
dispatcher(self.__wrap_completion, python_value, record)
231243

232244
return EPICS_OK
233245

tests/test_records.py

Lines changed: 214 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
12
import multiprocessing
23
import numpy
34
import os
45
import pytest
56

67
from conftest import (
8+
aioca_cleanup,
79
log,
810
create_random_prefix,
911
requires_cothread,
@@ -180,37 +182,6 @@ def test_pini_always_on():
180182
mbbi = builder.mbbIn("BBB", initial_value=5)
181183
assert mbbi.PINI.Value() == "YES"
182184

183-
184-
def check_record_blocking_attributes(record):
185-
"""Helper function to assert expected attributes exist for a blocking
186-
record"""
187-
assert record._blocking is True
188-
assert record._callback != 0
189-
190-
def test_blocking_creates_attributes():
191-
"""Test that setting the blocking flag on record creation creates the
192-
expected attributes"""
193-
ao1 = builder.aOut("OUTREC1", blocking=True)
194-
check_record_blocking_attributes(ao1)
195-
196-
ao2 = builder.aOut("OUTREC2", blocking=False)
197-
assert ao2._blocking is False
198-
199-
def test_blocking_global_flag_creates_attributes():
200-
"""Test that the global blocking flag creates the expected attributes"""
201-
set_blocking(True)
202-
bo1 = builder.boolOut("OUTREC1")
203-
204-
check_record_blocking_attributes(bo1)
205-
206-
set_blocking(False)
207-
bo2 = builder.boolOut("OUTREC2")
208-
assert bo2._blocking is False
209-
210-
bo3 = builder.boolOut("OUTREC3", blocking=True)
211-
check_record_blocking_attributes(bo3)
212-
213-
214185
def validate_fixture_names(params):
215186
"""Provide nice names for the out_records fixture in TestValidate class"""
216187
return params[0].__name__
@@ -530,3 +501,215 @@ def test_on_update_true_false(self, out_records):
530501
"""Test that on_update works correctly for all out records when
531502
always_update is True and the put'ed value is always different"""
532503
self.on_update_runner(out_records, True, False)
504+
505+
506+
507+
class TestBlocking:
508+
"""Tests related to the Blocking functionality"""
509+
510+
def check_record_blocking_attributes(self, record):
511+
"""Helper function to assert expected attributes exist for a blocking
512+
record"""
513+
assert record._blocking is True
514+
assert record._callback != 0
515+
516+
def test_blocking_creates_attributes(self):
517+
"""Test that setting the blocking flag on record creation creates the
518+
expected attributes"""
519+
ao1 = builder.aOut("OUTREC1", blocking=True)
520+
self.check_record_blocking_attributes(ao1)
521+
522+
ao2 = builder.aOut("OUTREC2", blocking=False)
523+
assert ao2._blocking is False
524+
525+
def test_blocking_global_flag_creates_attributes(self):
526+
"""Test that the global blocking flag creates the expected attributes"""
527+
set_blocking(True)
528+
bo1 = builder.boolOut("OUTREC1")
529+
self.check_record_blocking_attributes(bo1)
530+
531+
set_blocking(False)
532+
bo2 = builder.boolOut("OUTREC2")
533+
assert bo2._blocking is False
534+
535+
bo3 = builder.boolOut("OUTREC3", blocking=True)
536+
self.check_record_blocking_attributes(bo3)
537+
538+
def blocking_test_func(self, device_name, conn):
539+
540+
builder.SetDeviceName(device_name)
541+
542+
count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0)
543+
544+
async def blocking_update_func(new_val):
545+
"""A function that will block for some time"""
546+
log("CHILD: blocking_update_func starting")
547+
await asyncio.sleep(0.5)
548+
log("CHILD: Finished sleep!")
549+
completed_count = count_rec.get() + 1
550+
count_rec.set(completed_count)
551+
log(
552+
"CHILD: blocking_update_func finished, completed ",
553+
completed_count
554+
)
555+
556+
builder.longOut(
557+
"BLOCKING-REC",
558+
on_update=blocking_update_func,
559+
always_update=True,
560+
blocking=True
561+
)
562+
563+
564+
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
565+
builder.LoadDatabase()
566+
softioc.iocInit(dispatcher)
567+
568+
conn.send("R") # "Ready"
569+
570+
log("CHILD: Sent R over Connection to Parent")
571+
572+
# Keep process alive while main thread runs CAGET
573+
if conn.poll(TIMEOUT):
574+
val = conn.recv()
575+
assert val == "D", "Did not receive expected Done character"
576+
577+
log("CHILD: Received exit command, child exiting")
578+
579+
def test_blocking_single_thread_multiple_calls(self):
580+
"""Test that a blocking record correctly causes multiple caputs from
581+
a single thread to wait for the expected time"""
582+
parent_conn, child_conn = multiprocessing.Pipe()
583+
584+
device_name = create_random_prefix()
585+
586+
process = multiprocessing.Process(
587+
target=self.blocking_test_func,
588+
args=(device_name, child_conn),
589+
)
590+
591+
process.start()
592+
593+
log("PARENT: Child started, waiting for R command")
594+
595+
from cothread.catools import caget, caput, _channel_cache
596+
597+
try:
598+
# Wait for message that IOC has started
599+
select_and_recv(parent_conn, "R")
600+
601+
log("PARENT: received R command")
602+
603+
# Suppress potential spurious warnings
604+
_channel_cache.purge()
605+
606+
# Track number of puts sent
607+
count = 1
608+
MAX_COUNT = 4
609+
610+
log("PARENT: begining While loop")
611+
612+
while count <= MAX_COUNT:
613+
put_ret = caput(
614+
device_name + ":BLOCKING-REC",
615+
5, # Arbitrary value
616+
wait=True,
617+
timeout=TIMEOUT
618+
)
619+
assert put_ret.ok, f"caput did not succeed: {put_ret.errorcode}"
620+
621+
log(f"PARENT: completed caput with count {count}")
622+
623+
count += 1
624+
625+
log("PARENT: Getting value from counter")
626+
627+
ret_val = caget(
628+
device_name + ":BLOCKING-COUNTER",
629+
timeout=TIMEOUT,
630+
)
631+
assert ret_val.ok, \
632+
f"caget did not succeed: {ret_val.errorcode}, {ret_val}"
633+
634+
log(f"PARENT: Received val from COUNTER: {ret_val}")
635+
636+
assert ret_val == MAX_COUNT
637+
638+
finally:
639+
# Suppress potential spurious warnings
640+
_channel_cache.purge()
641+
642+
log("PARENT: Sending Done command to child")
643+
parent_conn.send("D") # "Done"
644+
process.join(timeout=TIMEOUT)
645+
log(f"PARENT: Join completed with exitcode {process.exitcode}")
646+
if process.exitcode is None:
647+
pytest.fail("Process did not terminate")
648+
649+
@pytest.mark.asyncio
650+
async def test_blocking_multiple_threads(self):
651+
"""Test that a blocking record correctly causes caputs from multiple
652+
threads to wait for the expected time"""
653+
parent_conn, child_conn = multiprocessing.Pipe()
654+
655+
device_name = create_random_prefix()
656+
657+
process = multiprocessing.Process(
658+
target=self.blocking_test_func,
659+
args=(device_name, child_conn),
660+
)
661+
662+
process.start()
663+
664+
log("PARENT: Child started, waiting for R command")
665+
666+
from aioca import caget, caput
667+
668+
try:
669+
# Wait for message that IOC has started
670+
select_and_recv(parent_conn, "R")
671+
672+
log("PARENT: received R command")
673+
674+
MAX_COUNT = 4
675+
676+
async def query_record(index):
677+
log("SPAWNED: beginning blocking caput ", index)
678+
await caput(
679+
device_name + ":BLOCKING-REC",
680+
5, # Arbitrary value
681+
wait=True,
682+
timeout=TIMEOUT
683+
)
684+
log("SPAWNED: caput complete ", index)
685+
686+
queries = [query_record(i) for i in range(MAX_COUNT)] * MAX_COUNT
687+
688+
log("PARENT: Gathering list of queries")
689+
690+
await asyncio.gather(*queries)
691+
692+
log("PARENT: Getting value from counter")
693+
694+
ret_val = await caget(
695+
device_name + ":BLOCKING-COUNTER",
696+
timeout=TIMEOUT,
697+
)
698+
assert ret_val.ok, \
699+
f"caget did not succeed: {ret_val.errorcode}, {ret_val}"
700+
701+
log(f"PARENT: Received val from COUNTER: {ret_val}")
702+
703+
assert ret_val == MAX_COUNT
704+
705+
finally:
706+
# Clear the cache before stopping the IOC stops
707+
# "channel disconnected" error messages
708+
aioca_cleanup()
709+
710+
log("PARENT: Sending Done command to child")
711+
parent_conn.send("D") # "Done"
712+
process.join(timeout=TIMEOUT)
713+
log(f"PARENT: Join completed with exitcode {process.exitcode}")
714+
if process.exitcode is None:
715+
pytest.fail("Process did not terminate")

0 commit comments

Comments
 (0)