Skip to content

Commit 6c4bac7

Browse files
committed
Add mechanism to record when a multigrid controller has exited
1 parent 28a47f7 commit 6c4bac7

File tree

4 files changed

+32
-6
lines changed

4 files changed

+32
-6
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class MultigridController:
3434
rsync_url: str = ""
3535
rsync_module: str = "data"
3636
demo: bool = False
37+
dormant: bool = False
38+
multigrid_watcher_active: bool = True
3739
processing_enabled: bool = True
3840
do_transfer: bool = True
3941
dummy_dc: bool = False
@@ -95,6 +97,9 @@ def __post_init__(self):
9597
register_client=False,
9698
)
9799

100+
def _multigrid_watcher_finalised(self):
101+
self.multigrid_watcher_active = False
102+
98103
def _start_rsyncer_multigrid(
99104
self,
100105
source: Path,

src/murfey/client/watchdir_multigrid.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,5 @@ def _process(self):
114114
if first_loop:
115115
first_loop = False
116116
time.sleep(15)
117+
118+
self.notify(final=True)

src/murfey/instrument_server/api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def start_multigrid_watcher(
176176
destination_overrides=watcher_spec.destination_overrides,
177177
)
178178
)
179+
watchers[session_id].subscribe(controllers[session_id]._multigrid_watcher_finalised)
179180
watchers[session_id].start()
180181
return {"success": True}
181182

src/murfey/util/__init__.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,19 +228,31 @@ class Observer:
228228
def __init__(self):
229229
self._listeners: list[Callable[..., Awaitable[None] | None]] = []
230230
self._secondary_listeners: list[Callable[..., Awaitable[None] | None]] = []
231+
self._final_listeners: list[Callable[..., Awaitable[None] | None]] = []
231232
super().__init__()
232233

233234
def subscribe(
234-
self, fn: Callable[..., Awaitable[None] | None], secondary: bool = False
235+
self,
236+
fn: Callable[..., Awaitable[None] | None],
237+
secondary: bool = False,
238+
final: bool = False,
235239
):
236-
if secondary:
240+
if final:
241+
self._final_listeners.append(fn)
242+
elif secondary:
237243
self._secondary_listeners.append(fn)
238244
else:
239245
self._listeners.append(fn)
240246

241-
async def anotify(self, *args, secondary: bool = False, **kwargs) -> None:
247+
async def anotify(
248+
self, *args, secondary: bool = False, final: bool = False, **kwargs
249+
) -> None:
242250
awaitables: list[Awaitable] = []
243-
listeners = self._secondary_listeners if secondary else self._listeners
251+
listeners = (
252+
self._secondary_listeners
253+
if secondary
254+
else self._final_listeners if final else self._listeners
255+
)
244256
for notify_function in listeners:
245257
result = notify_function(*args, **kwargs)
246258
if result is not None and inspect.isawaitable(result):
@@ -253,9 +265,15 @@ async def _await_all(awaitables: list[Awaitable]):
253265
for awaitable in asyncio.as_completed(awaitables):
254266
await awaitable
255267

256-
def notify(self, *args, secondary: bool = False, **kwargs) -> None:
268+
def notify(
269+
self, *args, secondary: bool = False, final: bool = False, **kwargs
270+
) -> None:
257271
awaitables: list[Awaitable] = []
258-
listeners = self._secondary_listeners if secondary else self._listeners
272+
listeners = (
273+
self._secondary_listeners
274+
if secondary
275+
else self._final_listeners if final else self._listeners
276+
)
259277
for notify_function in listeners:
260278
result = notify_function(*args, **kwargs)
261279
if result is not None and inspect.isawaitable(result):

0 commit comments

Comments
 (0)