Skip to content

Commit e4eaaf0

Browse files
authored
Fixed post-session cleanup logic (#641)
* Add a lock to the controllers dictionary when performing the cleanup of dormant controllers and split the dormant controller parsing and deletion into separate for loops * Added a function to the DirWatcher and Analyser to check if the thread is safe to stop * Updated the logic used to finalise a visit and safely close all associated threads * Adjusted logging setup for instrument server so that DEBUG logs are sent to the dashboard, but not displayed on console * Added new attribute to MultigridController to denote when it's started the post-session cleanup process * Renamed and updated multigrid controller checking endpoints; they now return a dictionary with useful properties regarding the status of the multigrid controller * Return False early during multigrid controller check if access token for the current session is not found * Return empty list if controllers for the session being queried don't exist * Send websocket message to trigger refresh on frontend after database cleanup * Added more keys to the websocket messages sent by the multigrid controller, which can be used to target which queries to refetch on the frontend side Co-authored by: @d-j-hatton
1 parent 432c50b commit e4eaaf0

File tree

10 files changed

+205
-72
lines changed

10 files changed

+205
-72
lines changed

src/murfey/client/analyser.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ def _analyse(self):
370370
)
371371
self.post_transfer(transferred_file)
372372
self.queue.task_done()
373+
logger.debug("Analyer thread has stopped analysing incoming files")
373374
self.notify(final=True)
374375

375376
def _xml_file(self, data_file: Path) -> Path:
@@ -403,6 +404,12 @@ def request_stop(self):
403404
self._stopping = True
404405
self._halt_thread = True
405406

407+
def is_safe_to_stop(self):
408+
"""
409+
Checks that the analyser thread is safe to stop
410+
"""
411+
return self._stopping and self._halt_thread and not self.queue.qsize()
412+
406413
def stop(self):
407414
logger.debug("Analyser thread stop requested")
408415
self._stopping = True
@@ -412,5 +419,8 @@ def stop(self):
412419
self.queue.put(None)
413420
self.thread.join()
414421
except Exception as e:
415-
logger.error(f"Exception encountered while stopping analyser: {e}")
422+
logger.error(
423+
f"Exception encountered while stopping Analyser: {e}",
424+
exc_info=True,
425+
)
416426
logger.debug("Analyser thread stop completed")

src/murfey/client/multigrid_control.py

Lines changed: 89 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import subprocess
44
import threading
5+
import time
56
from dataclasses import dataclass, field
67
from datetime import datetime
78
from functools import partial
@@ -36,6 +37,7 @@ class MultigridController:
3637
rsync_url: str = ""
3738
rsync_module: str = "data"
3839
demo: bool = False
40+
finalising: bool = False
3941
dormant: bool = False
4042
multigrid_watcher_active: bool = True
4143
processing_enabled: bool = True
@@ -117,34 +119,70 @@ def __post_init__(self):
117119

118120
def _multigrid_watcher_finalised(self):
119121
self.multigrid_watcher_active = False
120-
self.dormancy_check()
121122

122-
def dormancy_check(self):
123+
def is_ready_for_dormancy(self):
124+
"""
125+
When the multigrid watcher is no longer active, sends a request to safely stop
126+
the analyser and file watcher threads, then checks to see that those threads
127+
and the RSyncer processes associated with the current session have all been
128+
safely stopped
129+
"""
130+
log.debug(
131+
f"Starting dormancy check for MultigridController for session {self.session_id}"
132+
)
123133
if not self.multigrid_watcher_active:
124-
if (
134+
for a in self.analysers.values():
135+
if a.is_safe_to_stop():
136+
a.stop()
137+
for w in self._environment.watchers.values():
138+
if w.is_safe_to_stop():
139+
w.stop()
140+
return (
125141
all(r._finalised for r in self.rsync_processes.values())
126142
and not any(a.thread.is_alive() for a in self.analysers.values())
127143
and not any(
128144
w.thread.is_alive() for w in self._environment.watchers.values()
129145
)
130-
):
146+
)
147+
log.debug(f"Multigrid watcher for session {self.session_id} is still active")
148+
return False
149+
150+
def clean_up_once_dormant(self, running_threads: list[threading.Thread]):
151+
"""
152+
A function run in a separate thread that runs the post-session cleanup logic
153+
once all threads associated with this current session are halted, and marks
154+
the controller as being fully dormant after doing so.
155+
"""
156+
for thread in running_threads:
157+
thread.join()
158+
log.debug(f"RSyncer cleanup thread {thread.ident} has stopped safely")
159+
while not self.is_ready_for_dormancy():
160+
time.sleep(10)
161+
162+
# Once all threads are stopped, remove session from the database
163+
log.debug(
164+
f"Submitting request to remove session {self.session_id} from database"
165+
)
166+
response = capture_delete(
167+
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
168+
)
169+
success = response.status_code == 200 if response else False
170+
if not success:
171+
log.warning(f"Could not delete database data for {self.session_id}")
172+
173+
# Send message to frontend to trigger a refresh
174+
self.ws.send(
175+
json.dumps(
176+
{
177+
"message": "refresh",
178+
"target": "sessions",
179+
"instrument_name": self.instrument_name,
180+
}
181+
)
182+
)
131183

132-
def call_remove_session():
133-
response = capture_delete(
134-
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
135-
)
136-
success = response.status_code == 200 if response else False
137-
if not success:
138-
log.warning(
139-
f"Could not delete database data for {self.session_id}"
140-
)
141-
142-
dormancy_thread = threading.Thread(
143-
name=f"Session deletion thread {self.session_id}",
144-
target=call_remove_session,
145-
)
146-
dormancy_thread.start()
147-
self.dormant = True
184+
# Mark as dormant
185+
self.dormant = True
148186

149187
def abandon(self):
150188
for a in self.analysers.values():
@@ -155,12 +193,26 @@ def abandon(self):
155193
p.request_stop()
156194

157195
def finalise(self):
196+
self.finalising = True
158197
for a in self.analysers.values():
159198
a.request_stop()
199+
log.debug(f"Stop request sent to analyser {a}")
160200
for w in self._environment.watchers.values():
161201
w.request_stop()
202+
log.debug(f"Stop request sent to watcher {w}")
203+
rsync_finaliser_threads = []
162204
for p in self.rsync_processes.keys():
163-
self._finalise_rsyncer(p)
205+
# Collect the running rsyncer finaliser threads to pass to the dormancy checker
206+
rsync_finaliser_threads.append(self._finalise_rsyncer(p))
207+
log.debug(f"Finalised rsyncer {p}")
208+
209+
# Run the session cleanup function in a separate thread
210+
cleanup_upon_dormancy_thread = threading.Thread(
211+
target=self.clean_up_once_dormant,
212+
args=[rsync_finaliser_threads],
213+
daemon=True,
214+
)
215+
cleanup_upon_dormancy_thread.start()
164216

165217
def update_visit_time(self, new_end_time: datetime):
166218
# Convert the received server timestamp into the local equivalent
@@ -224,7 +276,15 @@ def _start_rsyncer_multigrid(
224276
transfer=machine_data.get("data_transfer_enabled", True),
225277
restarted=str(source) in self.rsync_restarts,
226278
)
227-
self.ws.send(json.dumps({"message": "refresh"}))
279+
self.ws.send(
280+
json.dumps(
281+
{
282+
"message": "refresh",
283+
"target": "rsyncer",
284+
"session_id": self.session_id,
285+
}
286+
)
287+
)
228288

229289
def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
230290
if explicit_stop:
@@ -235,15 +295,19 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
235295
capture_post(stop_url, json={"source": str(source)})
236296

237297
def _finalise_rsyncer(self, source: Path):
298+
"""
299+
Starts a new Rsyncer thread that cleans up the directories, and returns that
300+
thread to be managed by a central thread.
301+
"""
238302
finalise_thread = threading.Thread(
239303
name=f"Controller finaliser thread ({source})",
240-
target=partial(
241-
self.rsync_processes[source].finalise, callback=self.dormancy_check
242-
),
304+
target=self.rsync_processes[source].finalise,
243305
kwargs={"thread": False},
244306
daemon=True,
245307
)
246308
finalise_thread.start()
309+
log.debug(f"Started RSync cleanup for {str(source)}")
310+
return finalise_thread
247311

248312
def _restart_rsyncer(self, source: Path):
249313
self.rsync_processes[source].restart()
@@ -368,7 +432,6 @@ def rsync_result(update: RSyncerUpdate):
368432
)
369433
else:
370434
self.analysers[source].subscribe(self._data_collection_form)
371-
self.analysers[source].subscribe(self.dormancy_check, final=True)
372435
self.analysers[source].start()
373436
if transfer:
374437
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
@@ -408,9 +471,6 @@ def _rsync_update_converter(p: Path) -> None:
408471
),
409472
secondary=True,
410473
)
411-
self._environment.watchers[source].subscribe(
412-
self.dormancy_check, final=True
413-
)
414474
self._environment.watchers[source].start()
415475

416476
def _data_collection_form(self, response: dict):

src/murfey/client/watchdir.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,27 @@ def request_stop(self):
6767
self._stopping = True
6868
self._halt_thread = True
6969

70+
def is_safe_to_stop(self):
71+
"""
72+
Checks that the directory watcher thread is safe to stop
73+
"""
74+
return self._stopping and self._halt_thread and not self.queue.qsize()
75+
7076
def stop(self):
71-
log.debug("DirWatcher thread stop requested")
7277
self._stopping = True
7378
if self.thread.is_alive():
7479
self.queue.join()
7580

7681
self._halt_thread = True
77-
if self.thread.is_alive():
78-
self.queue.put(None)
79-
self.thread.join()
82+
try:
83+
if self.thread.is_alive():
84+
self.queue.put(None)
85+
self.thread.join()
86+
except Exception as e:
87+
log.error(
88+
f"Exception encountered while stopping DirWatcher: {e}",
89+
exc_info=True,
90+
)
8091
log.debug("DirWatcher thread stop completed")
8192

8293
def _process(self):
@@ -94,6 +105,7 @@ def _process(self):
94105
modification_time=modification_time, transfer_all=self._transfer_all
95106
)
96107
time.sleep(15)
108+
log.debug(f"DirWatcher {self} has stopped scanning")
97109
self.notify(final=True)
98110

99111
def scan(self, modification_time: float | None = None, transfer_all: bool = False):

src/murfey/instrument_server/__init__.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,21 +45,32 @@ def start_instrument_server():
4545

4646
LogFilter.install()
4747

48+
# Log everything from Murfey by default
49+
logging.getLogger("murfey").setLevel(logging.DEBUG)
50+
51+
# Show only logs at INFO level and above in the console
4852
rich_handler = RichHandler(enable_link_path=False)
49-
logging.getLogger("murfey").setLevel(logging.INFO)
53+
rich_handler.setLevel(logging.INFO)
5054
logging.getLogger("murfey").addHandler(rich_handler)
5155
logging.getLogger("fastapi").addHandler(rich_handler)
5256
logging.getLogger("uvicorn").addHandler(rich_handler)
5357

58+
# Create a websocket app to connect to the backend
5459
ws = murfey.client.websocket.WSApp(
5560
server=read_config().get("Murfey", "server", fallback=""),
5661
register_client=False,
5762
)
5863

59-
handler = CustomHandler(ws.send)
60-
logging.getLogger("murfey").addHandler(handler)
61-
logging.getLogger("fastapi").addHandler(handler)
62-
logging.getLogger("uvicorn").addHandler(handler)
64+
# Forward DEBUG levels logs and above from Murfey to the backend
65+
murfey_ws_handler = CustomHandler(ws.send)
66+
murfey_ws_handler.setLevel(logging.DEBUG)
67+
logging.getLogger("murfey").addHandler(murfey_ws_handler)
68+
69+
# Forward only INFO level logs and above for other packages
70+
other_ws_handler = CustomHandler(ws.send)
71+
other_ws_handler.setLevel(logging.INFO)
72+
logging.getLogger("fastapi").addHandler(other_ws_handler)
73+
logging.getLogger("uvicorn").addHandler(other_ws_handler)
6374

6475
logger.info(
6576
f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}"

0 commit comments

Comments
 (0)