Skip to content

Commit 84770ff

Browse files
authored
Allow visit completion to run more asynchronously (#543)
The multigrid controller should now be able to handle most of the process of cleaning up all associated threads. Thread completion requests are performed by setting flags and not explicitly waiting for joins. Callbacks are then used to notify of thread completion. (RSyncer threads are slightly more complicated because they have to perform file removal after completing.) On completion of all associated threads the multigrid controller will mark itself as dormant. When new multigrid controllers are added dormant controllers are checked for and removed along with the associated Murfey session data.
1 parent f4df956 commit 84770ff

File tree

10 files changed

+125
-9
lines changed

10 files changed

+125
-9
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ developer = [
5353
"pytest", # Test code functionality
5454
]
5555
instrument-server = [
56+
"aiohttp",
5657
"fastapi[standard]",
5758
"python-jose",
5859
]

src/murfey/client/analyser.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ def _analyse(self):
404404
)
405405
self.post_transfer(transferred_file)
406406
self.queue.task_done()
407+
self.notify(final=True)
407408

408409
def _xml_file(self, data_file: Path) -> Path:
409410
if not self._environment:
@@ -432,6 +433,10 @@ def start(self):
432433
logger.info(f"Analyser thread starting for {self}")
433434
self.thread.start()
434435

436+
def request_stop(self):
437+
self._stopping = True
438+
self._halt_thread = True
439+
435440
def stop(self):
436441
logger.debug("Analyser thread stop requested")
437442
self._stopping = True

src/murfey/client/multigrid_control.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import Dict, List, Optional
1010
from urllib.parse import urlparse
1111

12+
import aiohttp
1213
import requests
1314

1415
import murfey.client.websocket
@@ -34,6 +35,8 @@ class MultigridController:
3435
rsync_url: str = ""
3536
rsync_module: str = "data"
3637
demo: bool = False
38+
dormant: bool = False
39+
multigrid_watcher_active: bool = True
3740
processing_enabled: bool = True
3841
do_transfer: bool = True
3942
dummy_dc: bool = False
@@ -95,6 +98,37 @@ def __post_init__(self):
9598
register_client=False,
9699
)
97100

101+
def _multigrid_watcher_finalised(self):
102+
self.multigrid_watcher_active = False
103+
self.dormancy_check()
104+
105+
async def dormancy_check(self):
106+
if not self.multigrid_watcher_active:
107+
if (
108+
all(r._finalised for r in self.rsync_processes.values())
109+
and not any(a.thread.is_alive() for a in self.analysers.values())
110+
and not any(
111+
w.thread.is_alive() for w in self._environment.watchers.values()
112+
)
113+
):
114+
async with aiohttp.ClientSession() as clientsession:
115+
async with clientsession.delete(
116+
f"{self._environment.url.geturl()}/sessions/{self.session_id}",
117+
json={"access_token": self.token, "token_type": "bearer"},
118+
) as response:
119+
success = response.status == 200
120+
if not success:
121+
log.warning(f"Could not delete database data for {self.session_id}")
122+
self.dormant = True
123+
124+
def finalise(self):
125+
for a in self.analysers.values():
126+
a.request_stop()
127+
for w in self._environment.watchers.values():
128+
w.request_stop()
129+
for p in self.rsync_processes.keys():
130+
self._finalise_rsyncer(p)
131+
98132
def _start_rsyncer_multigrid(
99133
self,
100134
source: Path,
@@ -165,7 +199,9 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
165199
def _finalise_rsyncer(self, source: Path):
166200
finalise_thread = threading.Thread(
167201
name=f"Controller finaliser thread ({source})",
168-
target=self.rsync_processes[source].finalise,
202+
target=partial(
203+
self.rsync_processes[source].finalise, callback=self.dormancy_check
204+
),
169205
kwargs={"thread": False},
170206
daemon=True,
171207
)
@@ -297,6 +333,7 @@ def rsync_result(update: RSyncerUpdate):
297333
)
298334
else:
299335
self.analysers[source].subscribe(self._data_collection_form)
336+
self.analysers[source].subscribe(self.dormancy_check, final=True)
300337
self.analysers[source].start()
301338
if transfer:
302339
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
@@ -336,6 +373,9 @@ def _rsync_update_converter(p: Path) -> None:
336373
),
337374
secondary=True,
338375
)
376+
self._environment.watchers[source].subscribe(
377+
self.dormancy_check, final=True
378+
)
339379
self._environment.watchers[source].start()
340380

341381
def _data_collection_form(self, response: dict):

src/murfey/client/rsync.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import time
1616
from enum import Enum
1717
from pathlib import Path
18-
from typing import Callable, List, NamedTuple
18+
from typing import Awaitable, Callable, List, NamedTuple
1919
from urllib.parse import ParseResult
2020

2121
from murfey.client.tui.status_bar import StatusBar
@@ -75,6 +75,7 @@ def __init__(
7575
self._local = local
7676
self._server_url = server_url
7777
self._notify = notify
78+
self._finalised = False
7879

7980
# Set rsync destination
8081
if local:
@@ -181,7 +182,11 @@ def stop(self):
181182
self.thread.join()
182183
logger.debug("RSync thread stop completed")
183184

184-
def finalise(self, thread: bool = True):
185+
def finalise(
186+
self,
187+
thread: bool = True,
188+
callback: Callable[..., Awaitable[None] | None] | None = None,
189+
):
185190
self.stop()
186191
self._remove_files = True
187192
self._notify = False
@@ -196,6 +201,9 @@ def finalise(self, thread: bool = True):
196201
self.stop()
197202
else:
198203
self._transfer(list(self._basepath.glob("**/*")))
204+
self._finalised = True
205+
if callback:
206+
callback()
199207

200208
def enqueue(self, file_path: Path):
201209
if not self._stopping:

src/murfey/client/watchdir.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ def start(self):
6363
log.info(f"DirWatcher thread starting for {self}")
6464
self.thread.start()
6565

66+
def request_stop(self):
67+
self._stopping = True
68+
self._halt_thread = True
69+
6670
def stop(self):
6771
log.debug("DirWatcher thread stop requested")
6872
self._stopping = True
@@ -90,6 +94,7 @@ def _process(self):
9094
modification_time=modification_time, transfer_all=self._transfer_all
9195
)
9296
time.sleep(15)
97+
self.notify(final=True)
9398

9499
def scan(self, modification_time: float | None = None, transfer_all: bool = False):
95100
"""

src/murfey/client/watchdir_multigrid.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,5 @@ def _process(self):
114114
if first_loop:
115115
first_loop = False
116116
time.sleep(15)
117+
118+
self.notify(final=True)

src/murfey/instrument_server/api.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ def start_multigrid_watcher(
144144
if controllers.get(session_id) is not None:
145145
return {"success": True}
146146
label = watcher_spec.label
147+
for sid, controller in controllers.items():
148+
if controller.dormant:
149+
del controllers[sid]
147150
controllers[session_id] = MultigridController(
148151
[],
149152
watcher_spec.visit,
@@ -176,6 +179,9 @@ def start_multigrid_watcher(
176179
destination_overrides=watcher_spec.destination_overrides,
177180
)
178181
)
182+
watchers[session_id].subscribe(
183+
controllers[session_id]._multigrid_watcher_finalised, final=True
184+
)
179185
watchers[session_id].start()
180186
return {"success": True}
181187

@@ -213,6 +219,13 @@ def finalise_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource)
213219
return {"success": True}
214220

215221

222+
@router.post("/sessions/{session_id}/finalise_session")
223+
def finalise_session(session_id: MurfeySessionID):
224+
watchers[session_id].request_stop()
225+
controllers[session_id].finalise()
226+
return {"success": True}
227+
228+
216229
@router.post("/sessions/{session_id}/restart_rsyncer")
217230
def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
218231
controllers[session_id]._restart_rsyncer(rsyncer_source.source)

src/murfey/server/api/instrument.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,27 @@ async def finalise_rsyncer(
353353
return data
354354

355355

356+
@router.post("/sessions/{session_id}/finalise_session")
357+
async def finalise_session(session_id: MurfeySessionID, db=murfey_db):
358+
data = {}
359+
instrument_name = (
360+
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
361+
)
362+
machine_config = get_machine_config(instrument_name=instrument_name)[
363+
instrument_name
364+
]
365+
if machine_config.instrument_server_url:
366+
async with aiohttp.ClientSession() as clientsession:
367+
async with clientsession.post(
368+
f"{machine_config.instrument_server_url}/sessions/{session_id}/finalise_session",
369+
headers={
370+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
371+
},
372+
) as resp:
373+
data = await resp.json()
374+
return data
375+
376+
356377
@router.post("/sessions/{session_id}/remove_rsyncer")
357378
async def remove_rsyncer(
358379
session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db

src/murfey/util/__init__.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,19 +228,31 @@ class Observer:
228228
def __init__(self):
229229
self._listeners: list[Callable[..., Awaitable[None] | None]] = []
230230
self._secondary_listeners: list[Callable[..., Awaitable[None] | None]] = []
231+
self._final_listeners: list[Callable[..., Awaitable[None] | None]] = []
231232
super().__init__()
232233

233234
def subscribe(
234-
self, fn: Callable[..., Awaitable[None] | None], secondary: bool = False
235+
self,
236+
fn: Callable[..., Awaitable[None] | None],
237+
secondary: bool = False,
238+
final: bool = False,
235239
):
236-
if secondary:
240+
if final:
241+
self._final_listeners.append(fn)
242+
elif secondary:
237243
self._secondary_listeners.append(fn)
238244
else:
239245
self._listeners.append(fn)
240246

241-
async def anotify(self, *args, secondary: bool = False, **kwargs) -> None:
247+
async def anotify(
248+
self, *args, secondary: bool = False, final: bool = False, **kwargs
249+
) -> None:
242250
awaitables: list[Awaitable] = []
243-
listeners = self._secondary_listeners if secondary else self._listeners
251+
listeners = (
252+
self._secondary_listeners
253+
if secondary
254+
else self._final_listeners if final else self._listeners
255+
)
244256
for notify_function in listeners:
245257
result = notify_function(*args, **kwargs)
246258
if result is not None and inspect.isawaitable(result):
@@ -253,9 +265,15 @@ async def _await_all(awaitables: list[Awaitable]):
253265
for awaitable in asyncio.as_completed(awaitables):
254266
await awaitable
255267

256-
def notify(self, *args, secondary: bool = False, **kwargs) -> None:
268+
def notify(
269+
self, *args, secondary: bool = False, final: bool = False, **kwargs
270+
) -> None:
257271
awaitables: list[Awaitable] = []
258-
listeners = self._secondary_listeners if secondary else self._listeners
272+
listeners = (
273+
self._secondary_listeners
274+
if secondary
275+
else self._final_listeners if final else self._listeners
276+
)
259277
for notify_function in listeners:
260278
result = notify_function(*args, **kwargs)
261279
if result is not None and inspect.isawaitable(result):

src/murfey/util/state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,12 @@ def subscribe(
6969
self,
7070
fn: Callable[[str, T | None], Awaitable[None] | None],
7171
secondary: bool = False,
72+
final: bool = False,
7273
):
7374
if secondary:
7475
self._secondary_listeners.append(fn)
76+
elif final:
77+
self._final_listeners.append(fn)
7578
else:
7679
self._listeners.append(fn)
7780

0 commit comments

Comments
 (0)