Skip to content

Commit dcd3e12

Browse files
mdavidsaverAlexanderWells-diamond
authored andcommitted
AsyncioDispatcher cleanup tasks atexit
1 parent d55483d commit dcd3e12

File tree

1 file changed

+43
-9
lines changed

1 file changed

+43
-9
lines changed

softioc/asyncio_dispatcher.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,61 @@
55
import atexit
66

77
class AsyncioDispatcher:
8-
def __init__(self, loop=None):
8+
def __init__(self, loop=None, debug=False):
99
"""A dispatcher for `asyncio` based IOCs, suitable to be passed to
1010
`softioc.iocInit`. Means that `on_update` callback functions can be
1111
async.
1212
1313
If a ``loop`` is provided it must already be running. Otherwise a new
1414
Event Loop will be created and run in a dedicated thread.
15+
``debug`` is passed through to ``asyncio.run()``.
16+
17+
For a clean exit, call ``softioc.interactive_ioc(..., call_exit=False)``
1518
"""
1619
if loop is None:
20+
# will wait until worker is executing the new loop
21+
started = threading.Event()
1722
# Make one and run it in a background thread
18-
self.loop = asyncio.new_event_loop()
19-
worker = threading.Thread(target=self.loop.run_forever)
23+
self.__worker = threading.Thread(target=asyncio.run,
24+
args=(self.__inloop(started),),
25+
kwargs={'debug': debug})
2026
# Explicitly manage worker thread as part of interpreter shutdown.
2127
# Otherwise threading module will deadlock trying to join()
2228
# before our atexit hook runs, while the loop is still running.
23-
worker.daemon = True
29+
self.__worker.daemon = True
30+
31+
self.__worker.start()
32+
started.wait()
33+
34+
self.__atexit = atexit.register(self.__shutdown)
35+
36+
assert self.loop is not None and self.loop.is_running()
2437

25-
@atexit.register
26-
def aioJoin(worker=worker, loop=self.loop):
27-
loop.call_soon_threadsafe(loop.stop)
28-
worker.join()
29-
worker.start()
3038
elif not loop.is_running():
3139
raise ValueError("Provided asyncio event loop is not running")
3240
else:
3341
self.loop = loop
3442

43+
def close(self):
44+
if self.__atexit is not None:
45+
atexit.unregister(self.__atexit)
46+
self.__atexit = None
47+
48+
self.__shutdown()
49+
50+
async def __inloop(self, started):
51+
self.loop = asyncio.get_running_loop()
52+
self.__interrupt = asyncio.Event()
53+
started.set()
54+
del started
55+
await self.__interrupt.wait()
56+
57+
def __shutdown(self):
58+
if self.__worker is not None:
59+
self.loop.call_soon_threadsafe(self.__interrupt.set)
60+
self.__worker.join()
61+
self.__worker = None
62+
3563
def __call__(
3664
self,
3765
func,
@@ -48,3 +76,9 @@ async def async_wrapper():
4876
except Exception:
4977
logging.exception("Exception when running dispatched callback")
5078
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)
79+
80+
def __enter__(self):
81+
return self
82+
83+
def __exit__(self, A, B, C):
84+
self.close()

0 commit comments

Comments
 (0)