Skip to content

Commit f2ec83d

Browse files
committed
Multigrid controller can now passively request the shutdown of all watchers and analysers, and finalisation of all rsyncers. Each of these is given a callback that checks the state of all relevant threads before labelling the multigrid controller as dormant once all threads have shutdown
1 parent 6c4bac7 commit f2ec83d

File tree

4 files changed

+46
-3
lines changed

4 files changed

+46
-3
lines changed

src/murfey/client/analyser.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ def _analyse(self):
404404
)
405405
self.post_transfer(transferred_file)
406406
self.queue.task_done()
407+
self.notify(final=True)
407408

408409
def _xml_file(self, data_file: Path) -> Path:
409410
if not self._environment:
@@ -432,6 +433,10 @@ def start(self):
432433
logger.info(f"Analyser thread starting for {self}")
433434
self.thread.start()
434435

436+
def request_stop(self):
437+
self._stopping = True
438+
self._halt_thread = True
439+
435440
def stop(self):
436441
logger.debug("Analyser thread stop requested")
437442
self._stopping = True

src/murfey/client/multigrid_control.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ def __post_init__(self):
100100
def _multigrid_watcher_finalised(self):
101101
self.multigrid_watcher_active = False
102102

103+
def dormancy_check(self):
104+
if not self.multigrid_watcher_active:
105+
if (
106+
all(r._finalised for r in self.rsync_processes.values())
107+
and not any(a.thread.is_alive() for a in self.analysers.values())
108+
and not any(
109+
w.thread.is_alive() for w in self._environment.watchers.values()
110+
)
111+
):
112+
self.dormant = True
113+
114+
def finalise(self):
115+
for a in self.analysers.values():
116+
a.request_stop()
117+
for w in self._environment.watchers.values():
118+
w.request_stop()
119+
for p in self.rsync_processes.keys():
120+
self._finalise_rsyncer(p)
121+
103122
def _start_rsyncer_multigrid(
104123
self,
105124
source: Path,
@@ -170,7 +189,9 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
170189
def _finalise_rsyncer(self, source: Path):
171190
finalise_thread = threading.Thread(
172191
name=f"Controller finaliser thread ({source})",
173-
target=self.rsync_processes[source].finalise,
192+
target=partial(
193+
self.rsync_processes[source].finalise, callback=self.dormancy_check
194+
),
174195
kwargs={"thread": False},
175196
daemon=True,
176197
)
@@ -302,6 +323,7 @@ def rsync_result(update: RSyncerUpdate):
302323
)
303324
else:
304325
self.analysers[source].subscribe(self._data_collection_form)
326+
self.analysers[source].subscribe(self.dormancy_check, final=True)
305327
self.analysers[source].start()
306328
if transfer:
307329
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
@@ -341,6 +363,9 @@ def _rsync_update_converter(p: Path) -> None:
341363
),
342364
secondary=True,
343365
)
366+
self._environment.watchers[source].subscribe(
367+
self.dormancy_check, final=True
368+
)
344369
self._environment.watchers[source].start()
345370

346371
def _data_collection_form(self, response: dict):

src/murfey/client/rsync.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import time
1616
from enum import Enum
1717
from pathlib import Path
18-
from typing import Callable, List, NamedTuple
18+
from typing import Awaitable, Callable, List, NamedTuple
1919
from urllib.parse import ParseResult
2020

2121
from murfey.client.tui.status_bar import StatusBar
@@ -75,6 +75,7 @@ def __init__(
7575
self._local = local
7676
self._server_url = server_url
7777
self._notify = notify
78+
self._finalised = False
7879

7980
# Set rsync destination
8081
if local:
@@ -181,7 +182,11 @@ def stop(self):
181182
self.thread.join()
182183
logger.debug("RSync thread stop completed")
183184

184-
def finalise(self, thread: bool = True):
185+
def finalise(
186+
self,
187+
thread: bool = True,
188+
callback: Callable[..., Awaitable[None] | None] | None = None,
189+
):
185190
self.stop()
186191
self._remove_files = True
187192
self._notify = False
@@ -196,6 +201,9 @@ def finalise(self, thread: bool = True):
196201
self.stop()
197202
else:
198203
self._transfer(list(self._basepath.glob("**/*")))
204+
self._finalised = True
205+
if callback:
206+
callback()
199207

200208
def enqueue(self, file_path: Path):
201209
if not self._stopping:

src/murfey/client/watchdir.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ def start(self):
6363
log.info(f"DirWatcher thread starting for {self}")
6464
self.thread.start()
6565

66+
def request_stop(self):
67+
self._stopping = True
68+
self._halt_thread = True
69+
6670
def stop(self):
6771
log.debug("DirWatcher thread stop requested")
6872
self._stopping = True
@@ -90,6 +94,7 @@ def _process(self):
9094
modification_time=modification_time, transfer_all=self._transfer_all
9195
)
9296
time.sleep(15)
97+
self.notify(final=True)
9398

9499
def scan(self, modification_time: float | None = None, transfer_all: bool = False):
95100
"""

0 commit comments

Comments
 (0)