22import logging
33import subprocess
44import threading
5+ import time
56from dataclasses import dataclass , field
67from datetime import datetime
78from functools import partial
@@ -117,34 +118,57 @@ def __post_init__(self):
117118
118119 def _multigrid_watcher_finalised (self ):
119120 self .multigrid_watcher_active = False
120- self .dormancy_check ()
121121
122- def dormancy_check (self ):
122+ def is_ready_for_dormancy (self ):
123+ """
124+ When the multigrid watcher is no longer active, sends a request to safely stop
125+ the analyser and file watcher threads, then checks to see that those threads
126+ and the RSyncer processes associated with the current session have all been
127+ safely stopped
128+ """
129+ log .debug (
130+ f"Starting dormancy check for MultigridController for session { self .session_id } "
131+ )
123132 if not self .multigrid_watcher_active :
124- if (
133+ for a in self .analysers .values ():
134+ if a .is_safe_to_stop ():
135+ a .stop ()
136+ for w in self ._environment .watchers .values ():
137+ if w .is_safe_to_stop ():
138+ w .stop ()
139+ return (
125140 all (r ._finalised for r in self .rsync_processes .values ())
126141 and not any (a .thread .is_alive () for a in self .analysers .values ())
127142 and not any (
128143 w .thread .is_alive () for w in self ._environment .watchers .values ()
129144 )
130- ):
131-
132- def call_remove_session ():
133- response = capture_delete (
134- f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'remove_session' , session_id = self .session_id )} " ,
135- )
136- success = response .status_code == 200 if response else False
137- if not success :
138- log .warning (
139- f"Could not delete database data for { self .session_id } "
140- )
141-
142- dormancy_thread = threading .Thread (
143- name = f"Session deletion thread { self .session_id } " ,
144- target = call_remove_session ,
145- )
146- dormancy_thread .start ()
147- self .dormant = True
145+ )
146+ log .debug (f"Multigrid watcher for session { self .session_id } is still active" )
147+ return False
148+
149+ def clean_up_once_dormant (self , running_threads : list [threading .Thread ]):
150+ """
151+ A function run in a separate thread that runs the post-session cleanup logic
152+ once all threads associated with this current session are halted, and marks
153+ the controller as being fully dormant after doing so.
154+ """
155+ for thread in running_threads :
156+ thread .join ()
157+ log .debug (f"RSyncer cleanup thread { thread .ident } has stopped safely" )
158+ while not self .is_ready_for_dormancy ():
159+ time .sleep (10 )
160+
161+ # Once all threads are stopped, remove it from the database
162+ log .debug (
163+ f"Submitting request to remove session { self .session_id } from database"
164+ )
165+ response = capture_delete (
166+ f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'remove_session' , session_id = self .session_id )} " ,
167+ )
168+ success = response .status_code == 200 if response else False
169+ if not success :
170+ log .warning (f"Could not delete database data for { self .session_id } " )
171+ self .dormant = True
148172
149173 def abandon (self ):
150174 for a in self .analysers .values ():
@@ -157,10 +181,23 @@ def abandon(self):
157181 def finalise (self ):
158182 for a in self .analysers .values ():
159183 a .request_stop ()
184+ log .debug (f"Stop request sent to analyser { a } " )
160185 for w in self ._environment .watchers .values ():
161186 w .request_stop ()
187+ log .debug (f"Stop request sent to watcher { w } " )
188+ rsync_finaliser_threads = []
162189 for p in self .rsync_processes .keys ():
163- self ._finalise_rsyncer (p )
190+ # Collect the running rsyncer finaliser threads to pass to the dormancy checker
191+ rsync_finaliser_threads .append (self ._finalise_rsyncer (p ))
192+ log .debug (f"Finalised rsyncer { p } " )
193+
194+ # Run the session cleanup function in a separate thread
195+ cleanup_upon_dormancy_thread = threading .Thread (
196+ target = self .clean_up_once_dormant ,
197+ args = [rsync_finaliser_threads ],
198+ daemon = True ,
199+ )
200+ cleanup_upon_dormancy_thread .start ()
164201
165202 def update_visit_time (self , new_end_time : datetime ):
166203 # Convert the received server timestamp into the local equivalent
@@ -235,15 +272,19 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
235272 capture_post (stop_url , json = {"source" : str (source )})
236273
237274 def _finalise_rsyncer (self , source : Path ):
275+ """
276+ Starts a new Rsyncer thread that cleans up the directories, and returns that
277+ thread to be managed by a central thread.
278+ """
238279 finalise_thread = threading .Thread (
239280 name = f"Controller finaliser thread ({ source } )" ,
240- target = partial (
241- self .rsync_processes [source ].finalise , callback = self .dormancy_check
242- ),
281+ target = self .rsync_processes [source ].finalise ,
243282 kwargs = {"thread" : False },
244283 daemon = True ,
245284 )
246285 finalise_thread .start ()
286+ log .debug (f"Started RSync cleanup for { str (source )} " )
287+ return finalise_thread
247288
248289 def _restart_rsyncer (self , source : Path ):
249290 self .rsync_processes [source ].restart ()
@@ -368,7 +409,6 @@ def rsync_result(update: RSyncerUpdate):
368409 )
369410 else :
370411 self .analysers [source ].subscribe (self ._data_collection_form )
371- self .analysers [source ].subscribe (self .dormancy_check , final = True )
372412 self .analysers [source ].start ()
373413 if transfer :
374414 self .rsync_processes [source ].subscribe (self .analysers [source ].enqueue )
@@ -408,9 +448,6 @@ def _rsync_update_converter(p: Path) -> None:
408448 ),
409449 secondary = True ,
410450 )
411- self ._environment .watchers [source ].subscribe (
412- self .dormancy_check , final = True
413- )
414451 self ._environment .watchers [source ].start ()
415452
416453 def _data_collection_form (self , response : dict ):
0 commit comments