Skip to content

Commit 85e7116

Browse files
committed
Add functionality to check what controllers the instrument server has running to allow reconnections to sessions following instrument server restarts
1 parent f99070b commit 85e7116

File tree

7 files changed

+71
-13
lines changed

7 files changed

+71
-13
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class MultigridController:
3636
do_transfer: bool = True
3737
dummy_dc: bool = False
3838
force_mdoc_metadata: bool = True
39+
rsync_restarts: List[str] = field(default_factory=lambda: [])
3940
rsync_processes: Dict[Path, RSyncer] = field(default_factory=lambda: {})
4041
analysers: Dict[Path, Analyser] = field(default_factory=lambda: {})
4142
data_collection_parameters: dict = field(default_factory=lambda: {})
@@ -103,7 +104,10 @@ def _start_rsyncer_multigrid(
103104
f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine"
104105
).json()
105106
if destination_overrides.get(source):
106-
destination = destination_overrides[source] + f"/{extra_directory}"
107+
if str(source) in self.rsync_restarts:
108+
destination = destination_overrides[source]
109+
else:
110+
destination = destination_overrides[source] + f"/{extra_directory}"
107111
else:
108112
for k, v in destination_overrides.items():
109113
if Path(v).name in source.parts:
@@ -134,6 +138,7 @@ def _start_rsyncer_multigrid(
134138
tag=tag,
135139
limited=limited,
136140
transfer=machine_data.get("data_transfer_enabled", True),
141+
restarted=str(source) in self.rsync_restarts,
137142
)
138143
self.ws.send(json.dumps({"message": "refresh"}))
139144

@@ -175,6 +180,7 @@ def _start_rsyncer(
175180
tag: str = "",
176181
limited: bool = False,
177182
transfer: bool = True,
183+
restarted: bool = False,
178184
):
179185
log.info(f"starting rsyncer: {source}")
180186
if self._environment:
@@ -238,15 +244,21 @@ def rsync_result(update: RSyncerUpdate):
238244
),
239245
secondary=True,
240246
)
241-
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
242-
rsyncer_data = {
243-
"source": str(source),
244-
"destination": destination,
245-
"session_id": self.session_id,
246-
"transferring": self.do_transfer or self._environment.demo,
247-
"tag": tag,
248-
}
249-
requests.post(url, json=rsyncer_data)
247+
if restarted:
248+
restarted_url = (
249+
f"{self.murfey_url}/sessions/{self.session_id}/rsyncer_started"
250+
)
251+
capture_post(restarted_url, json={"source": str(source)})
252+
else:
253+
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
254+
rsyncer_data = {
255+
"source": str(source),
256+
"destination": destination,
257+
"session_id": self.session_id,
258+
"transferring": self.do_transfer or self._environment.demo,
259+
"tag": tag,
260+
}
261+
requests.post(url, json=rsyncer_data)
250262
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
251263

252264
if not self.analysers.get(source) and analyse:

src/murfey/client/tui/screens.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,11 @@ def determine_default_destination(
138138
_default = ""
139139
else:
140140
_default = destination + f"/{visit}"
141-
return _default + f"/{extra_directory}"
141+
return (
142+
_default + f"/{extra_directory}"
143+
if not _default.endswith("/")
144+
else _default + f"{extra_directory}"
145+
)
142146

143147

144148
class InputResponse(NamedTuple):

src/murfey/instrument_server/api.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import secrets
44
import time
55
from datetime import datetime
6+
from functools import partial
67
from logging import getLogger
78
from pathlib import Path
89
from typing import Annotated, Dict, List, Optional, Union
@@ -28,7 +29,7 @@
2829

2930
watchers: Dict[Union[str, int], MultigridDirWatcher] = {}
3031
rsyncers: Dict[str, RSyncer] = {}
31-
controllers = {}
32+
controllers: Dict[int, MultigridController] = {}
3233
data_collection_parameters: dict = {}
3334
tokens = {}
3435

@@ -131,10 +132,17 @@ async def token_handshake_for_session(session_id: int, token: Token):
131132
)
132133

133134

135+
@router.get("/sessions/{session_id}/check_token")
136+
def check_token(session_id: MurfeySessionID):
137+
return {"token_valid": True}
138+
139+
134140
@router.post("/sessions/{session_id}/multigrid_watcher")
135141
def start_multigrid_watcher(
136142
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
137143
):
144+
if controllers.get(session_id) is not None:
145+
return {"success": True}
138146
label = watcher_spec.label
139147
controllers[session_id] = MultigridController(
140148
[],
@@ -148,6 +156,7 @@ def start_multigrid_watcher(
148156
_machine_config=watcher_spec.configuration.dict(),
149157
token=tokens.get(session_id, "token"),
150158
data_collection_parameters=data_collection_parameters.get(label, {}),
159+
rsync_restarts=watcher_spec.rsync_restarts,
151160
)
152161
watcher_spec.source.mkdir(exist_ok=True)
153162
machine_config = requests.get(
@@ -161,7 +170,12 @@ def start_multigrid_watcher(
161170
watcher_spec.configuration.dict(),
162171
skip_existing_processing=watcher_spec.skip_existing_processing,
163172
)
164-
watchers[session_id].subscribe(controllers[session_id]._start_rsyncer_multigrid)
173+
watchers[session_id].subscribe(
174+
partial(
175+
controllers[session_id]._start_rsyncer_multigrid,
176+
destination_overrides=watcher_spec.destination_overrides,
177+
)
178+
)
165179
watchers[session_id].start()
166180
return {"success": True}
167181

src/murfey/server/api/instrument.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,24 @@ async def activate_instrument_server_for_session(
7575
return success
7676

7777

78+
@router.get("/instruments/{instrument_name}/sessions/{session_id}/active")
79+
async def check_if_session_is_active(instrument_name: str, session_id: int):
80+
if instrument_server_tokens.get(session_id) is None:
81+
return {"active": False}
82+
async with lock:
83+
async with aiohttp.ClientSession() as session:
84+
machine_config = get_machine_config(instrument_name=instrument_name)[
85+
instrument_name
86+
]
87+
async with session.get(
88+
f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/check_token",
89+
headers={
90+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
91+
},
92+
) as response:
93+
return {"active": response.status == 200}
94+
95+
7896
@router.post("/sessions/{session_id}/multigrid_watcher")
7997
async def start_multigrid_watcher(
8098
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db
@@ -109,6 +127,10 @@ async def start_multigrid_watcher(
109127
"label": visit,
110128
"instrument_name": instrument_name,
111129
"skip_existing_processing": watcher_spec.skip_existing_processing,
130+
"destination_overrides": {
131+
str(k): v for k, v in watcher_spec.destination_overrides.items()
132+
},
133+
"rsync_restarts": watcher_spec.rsync_restarts,
112134
},
113135
headers={
114136
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"

src/murfey/server/demo_api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClien
316316
def increment_rsync_file_count(
317317
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
318318
):
319+
print(rsyncer_info.source, rsyncer_info.destination, rsyncer_info.session_id)
319320
rsync_instance = db.exec(
320321
select(RsyncInstance).where(
321322
RsyncInstance.source == rsyncer_info.source,

src/murfey/util/instrument_models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pathlib import Path
2+
from typing import Dict, List
23

34
from pydantic import BaseModel
45

@@ -12,3 +13,5 @@ class MultigridWatcherSpec(BaseModel):
1213
visit: str
1314
instrument_name: str
1415
skip_existing_processing: bool = False
16+
destination_overrides: Dict[Path, str] = {}
17+
rsync_restarts: List[str] = []

src/murfey/util/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ class PostInfo(BaseModel):
334334
class MultigridWatcherSetup(BaseModel):
335335
source: Path
336336
skip_existing_processing: bool = False
337+
destination_overrides: Dict[Path, str] = {}
338+
rsync_restarts: List[str] = []
337339

338340

339341
class CurrentGainRef(BaseModel):

0 commit comments

Comments
 (0)