Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions src/fastcs/backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import Future
from types import MethodType

from softioc.asyncio_dispatcher import AsyncioDispatcher
Expand All @@ -19,8 +20,8 @@
self._loop = self._dispatcher.loop
self._controller = controller

self._initial_tasks = [controller.connect]
self._scan_tasks: list[asyncio.Task] = []
self._initial_coros = [controller.connect]
self._scan_futures: set[Future] = set()

asyncio.run_coroutine_threadsafe(
self._controller.initialise(), self._loop
Expand All @@ -41,28 +42,29 @@
_link_attribute_sender_class(single_mapping)

def __del__(self):
self.stop_scan_tasks()
self.stop_scan_futures()

Check warning on line 45 in src/fastcs/backend.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs/backend.py#L45

Added line #L45 was not covered by tests

def run(self):
self._run_initial_tasks()
self.start_scan_tasks()
self._run_initial_futures()
self.start_scan_futures()
self._run()

def _run_initial_tasks(self):
for task in self._initial_tasks:
future = asyncio.run_coroutine_threadsafe(task(), self._loop)
def _run_initial_futures(self):
for coro in self._initial_coros:
future = asyncio.run_coroutine_threadsafe(coro(), self._loop)
future.result()

def start_scan_tasks(self):
self._scan_tasks = [
self._loop.create_task(coro()) for coro in _get_scan_coros(self._mapping)
]
def start_scan_futures(self):
self._scan_futures = {
asyncio.run_coroutine_threadsafe(coro(), self._loop)
for coro in _get_scan_coros(self._mapping)
}

def stop_scan_tasks(self):
for task in self._scan_tasks:
if not task.done():
def stop_scan_futures(self):
for future in self._scan_futures:
if not future.done():
try:
task.cancel()
future.cancel()
except asyncio.CancelledError:
pass

Expand Down
4 changes: 2 additions & 2 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def __init__(self, controller):
super().__init__(controller)

self.init_task_called = False
self._initial_tasks.append(self.init_task)
self._initial_coros.append(self.init_task)

async def init_task(self):
self.init_task_called = True
Expand Down Expand Up @@ -44,4 +44,4 @@ async def test_backend(controller):
await asyncio.sleep(0.1)
assert controller.count > count

backend.stop_scan_tasks()
backend.stop_scan_futures()
Loading