Skip to content

Commit e9b70fc

Browse files
authored
Endpoints to get controller information from instrument server (#416)
Add functionality to check what controllers the instrument server has running to allow reconnections to sessions following instrument server restarts
1 parent 733c9b0 commit e9b70fc

File tree

6 files changed

+71
-13
lines changed

6 files changed

+71
-13
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class MultigridController:
3636
do_transfer: bool = True
3737
dummy_dc: bool = False
3838
force_mdoc_metadata: bool = True
39+
rsync_restarts: List[str] = field(default_factory=lambda: [])
3940
rsync_processes: Dict[Path, RSyncer] = field(default_factory=lambda: {})
4041
analysers: Dict[Path, Analyser] = field(default_factory=lambda: {})
4142
data_collection_parameters: dict = field(default_factory=lambda: {})
@@ -103,7 +104,11 @@ def _start_rsyncer_multigrid(
103104
f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine"
104105
).json()
105106
if destination_overrides.get(source):
106-
destination = destination_overrides[source] + f"/{extra_directory}"
107+
destination = (
108+
destination_overrides[source]
109+
if str(source) in self.rsync_restarts
110+
else destination_overrides[source] + f"/{extra_directory}"
111+
)
107112
else:
108113
for k, v in destination_overrides.items():
109114
if Path(v).name in source.parts:
@@ -134,6 +139,7 @@ def _start_rsyncer_multigrid(
134139
tag=tag,
135140
limited=limited,
136141
transfer=machine_data.get("data_transfer_enabled", True),
142+
restarted=str(source) in self.rsync_restarts,
137143
)
138144
self.ws.send(json.dumps({"message": "refresh"}))
139145

@@ -175,6 +181,7 @@ def _start_rsyncer(
175181
tag: str = "",
176182
limited: bool = False,
177183
transfer: bool = True,
184+
restarted: bool = False,
178185
):
179186
log.info(f"starting rsyncer: {source}")
180187
if self._environment:
@@ -238,15 +245,21 @@ def rsync_result(update: RSyncerUpdate):
238245
),
239246
secondary=True,
240247
)
241-
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
242-
rsyncer_data = {
243-
"source": str(source),
244-
"destination": destination,
245-
"session_id": self.session_id,
246-
"transferring": self.do_transfer or self._environment.demo,
247-
"tag": tag,
248-
}
249-
requests.post(url, json=rsyncer_data)
248+
if restarted:
249+
restarted_url = (
250+
f"{self.murfey_url}/sessions/{self.session_id}/rsyncer_started"
251+
)
252+
capture_post(restarted_url, json={"source": str(source)})
253+
else:
254+
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
255+
rsyncer_data = {
256+
"source": str(source),
257+
"destination": destination,
258+
"session_id": self.session_id,
259+
"transferring": self.do_transfer or self._environment.demo,
260+
"tag": tag,
261+
}
262+
requests.post(url, json=rsyncer_data)
250263
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
251264

252265
if not self.analysers.get(source) and analyse:

src/murfey/client/tui/screens.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ def determine_default_destination(
129129
_default = ""
130130
else:
131131
_default = destination + f"/{visit}"
132-
return _default + f"/{extra_directory}"
132+
return (
133+
_default + f"/{extra_directory}"
134+
if not _default.endswith("/")
135+
else _default + f"{extra_directory}"
136+
)
133137

134138

135139
class InputResponse(NamedTuple):

src/murfey/instrument_server/api.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import secrets
44
import time
55
from datetime import datetime
6+
from functools import partial
67
from logging import getLogger
78
from pathlib import Path
89
from typing import Annotated, Dict, List, Optional, Union
@@ -28,7 +29,7 @@
2829

2930
watchers: Dict[Union[str, int], MultigridDirWatcher] = {}
3031
rsyncers: Dict[str, RSyncer] = {}
31-
controllers = {}
32+
controllers: Dict[int, MultigridController] = {}
3233
data_collection_parameters: dict = {}
3334
tokens = {}
3435

@@ -131,10 +132,17 @@ async def token_handshake_for_session(session_id: int, token: Token):
131132
)
132133

133134

135+
@router.get("/sessions/{session_id}/check_token")
136+
def check_token(session_id: MurfeySessionID):
137+
return {"token_valid": True}
138+
139+
134140
@router.post("/sessions/{session_id}/multigrid_watcher")
135141
def start_multigrid_watcher(
136142
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
137143
):
144+
if controllers.get(session_id) is not None:
145+
return {"success": True}
138146
label = watcher_spec.label
139147
controllers[session_id] = MultigridController(
140148
[],
@@ -148,6 +156,7 @@ def start_multigrid_watcher(
148156
_machine_config=watcher_spec.configuration.dict(),
149157
token=tokens.get(session_id, "token"),
150158
data_collection_parameters=data_collection_parameters.get(label, {}),
159+
rsync_restarts=watcher_spec.rsync_restarts,
151160
)
152161
watcher_spec.source.mkdir(exist_ok=True)
153162
machine_config = requests.get(
@@ -161,7 +170,12 @@ def start_multigrid_watcher(
161170
watcher_spec.configuration.dict(),
162171
skip_existing_processing=watcher_spec.skip_existing_processing,
163172
)
164-
watchers[session_id].subscribe(controllers[session_id]._start_rsyncer_multigrid)
173+
watchers[session_id].subscribe(
174+
partial(
175+
controllers[session_id]._start_rsyncer_multigrid,
176+
destination_overrides=watcher_spec.destination_overrides,
177+
)
178+
)
165179
watchers[session_id].start()
166180
return {"success": True}
167181

src/murfey/server/api/instrument.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,24 @@ async def activate_instrument_server_for_session(
7575
return success
7676

7777

78+
@router.get("/instruments/{instrument_name}/sessions/{session_id}/active")
79+
async def check_if_session_is_active(instrument_name: str, session_id: int):
80+
if instrument_server_tokens.get(session_id) is None:
81+
return {"active": False}
82+
async with lock:
83+
async with aiohttp.ClientSession() as session:
84+
machine_config = get_machine_config(instrument_name=instrument_name)[
85+
instrument_name
86+
]
87+
async with session.get(
88+
f"{machine_config.instrument_server_url}/sessions/{int(sanitise(str(session_id)))}/check_token",
89+
headers={
90+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
91+
},
92+
) as response:
93+
return {"active": response.status == 200}
94+
95+
7896
@router.post("/sessions/{session_id}/multigrid_watcher")
7997
async def start_multigrid_watcher(
8098
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db
@@ -107,6 +125,10 @@ async def start_multigrid_watcher(
107125
"label": visit,
108126
"instrument_name": instrument_name,
109127
"skip_existing_processing": watcher_spec.skip_existing_processing,
128+
"destination_overrides": {
129+
str(k): v for k, v in watcher_spec.destination_overrides.items()
130+
},
131+
"rsync_restarts": watcher_spec.rsync_restarts,
110132
},
111133
headers={
112134
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"

src/murfey/util/instrument_models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pathlib import Path
2+
from typing import Dict, List
23

34
from pydantic import BaseModel
45

@@ -12,3 +13,5 @@ class MultigridWatcherSpec(BaseModel):
1213
visit: str
1314
instrument_name: str
1415
skip_existing_processing: bool = False
16+
destination_overrides: Dict[Path, str] = {}
17+
rsync_restarts: List[str] = []

src/murfey/util/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ class PostInfo(BaseModel):
299299
class MultigridWatcherSetup(BaseModel):
300300
source: Path
301301
skip_existing_processing: bool = False
302+
destination_overrides: Dict[Path, str] = {}
303+
rsync_restarts: List[str] = []
302304

303305

304306
class CurrentGainRef(BaseModel):

0 commit comments

Comments
 (0)