11import asyncio
2+ import time
23from collections import defaultdict
34from collections .abc import Callable
5+ from concurrent .futures import Future
46from types import MethodType
57
68from softioc .asyncio_dispatcher import AsyncioDispatcher
1012from .exceptions import FastCSException
1113from .mapping import Mapping , SingleMapping
1214
15+ START_TIME = time .time ()
16+
1317
1418class Backend :
1519 def __init__ (
@@ -19,8 +23,8 @@ def __init__(
1923 self ._loop = self ._dispatcher .loop
2024 self ._controller = controller
2125
22- self ._initial_tasks = [controller .connect ]
23- self ._scan_tasks : list [ asyncio . Task ] = []
26+ self ._initial_coros = [controller .connect ]
27+ self ._scan_futures : set [ Future ] = set ()
2428
2529 asyncio .run_coroutine_threadsafe (
2630 self ._controller .initialise (), self ._loop
@@ -41,25 +45,26 @@ def _link_process_tasks(self):
4145 _link_attribute_sender_class (single_mapping )
4246
4347 def __del__ (self ):
44- self .stop_scan_tasks ()
48+ self .stop_scan_futures ()
4549
4650 def run (self ):
47- self ._run_initial_tasks ()
48- self .start_scan_tasks ()
51+ self ._run_initial_futures ()
52+ self .start_scan_futures ()
4953 self ._run ()
5054
51- def _run_initial_tasks (self ):
52- for task in self ._initial_tasks :
55+ def _run_initial_futures (self ):
56+ for task in self ._initial_coros :
5357 future = asyncio .run_coroutine_threadsafe (task (), self ._loop )
5458 future .result ()
5559
56- def start_scan_tasks (self ):
57- self ._scan_tasks = [
58- self ._loop .create_task (coro ()) for coro in _get_scan_coros (self ._mapping )
59- ]
60+ def start_scan_futures (self ):
61+ self ._scan_futures = {
62+ asyncio .run_coroutine_threadsafe (coro (), self ._loop )
63+ for coro in _get_scan_coros (self ._mapping )
64+ }
6065
61- def stop_scan_tasks (self ):
62- for task in self ._scan_tasks :
66+ def stop_scan_futures (self ):
67+ for task in self ._scan_futures :
6368 if not task .done ():
6469 try :
6570 task .cancel ()
@@ -163,7 +168,7 @@ def _get_periodic_scan_coros(scan_dict: dict[float, list[Callable]]) -> list[Cal
163168def _create_periodic_scan_coro (period , methods : list [Callable ]) -> Callable :
164169 async def scan_coro () -> None :
165170 while True :
166- await asyncio .gather (* [method () for method in methods ])
167171 await asyncio .sleep (period )
172+ await asyncio .gather (* [method () for method in methods ])
168173
169174 return scan_coro
0 commit comments