Skip to content

Commit ed10b5f

Browse files
Utilities to update visit end times (#602)
Co-authored-by: Eu Pin Tien <[email protected]>
1 parent 25e0074 commit ed10b5f

File tree

5 files changed

+100
-6
lines changed

5 files changed

+100
-6
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ def finalise(self):
152152
for p in self.rsync_processes.keys():
153153
self._finalise_rsyncer(p)
154154

155+
def update_visit_time(self, new_end_time: datetime):
156+
self.visit_end_time = new_end_time
157+
for rp in self.rsync_processes.values():
158+
rp._end_time = new_end_time
159+
155160
def _start_rsyncer_multigrid(
156161
self,
157162
source: Path,

src/murfey/client/rsync.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,10 @@ def enqueue(self, file_path: Path):
220220
self.queue.put(absolute_path)
221221

222222
def flush_skipped(self):
223+
self._end_time = datetime.now()
223224
for f in self._skipped_files:
224225
self.queue.put(f)
226+
self._skipped_files = []
225227

226228
def _process(self):
227229
logger.info("RSync thread starting")

src/murfey/instrument_server/api.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,13 @@ def stop_multigrid_watcher(session_id: MurfeySessionID, label: str):
214214
watchers[label].request_stop()
215215

216216

217+
@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time")
218+
def update_multigrid_controller_visit_end_time(
219+
session_id: MurfeySessionID, end_time: datetime
220+
):
221+
controllers[session_id].update_visit_time(end_time)
222+
223+
217224
class RsyncerSource(BaseModel):
218225
source: Path
219226
label: str
@@ -261,6 +268,12 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
261268
return {"success": True}
262269

263270

271+
@router.post("/sessions/{session_id}/flush_skipped_rsyncer")
272+
def flush_skipped_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
273+
controllers[session_id].rsync_processes[rsyncer_source.source].flush_skipped()
274+
return {"success": True}
275+
276+
264277
class ObserverInfo(BaseModel):
265278
source: str
266279
num_files_transferred: int

src/murfey/server/api/instrument.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import datetime
55
import logging
6+
import urllib
67
from pathlib import Path
78
from typing import Annotated, List, Optional
89

@@ -126,7 +127,7 @@ async def setup_multigrid_watcher(
126127
str(k): v for k, v in watcher_spec.destination_overrides.items()
127128
},
128129
"rsync_restarts": watcher_spec.rsync_restarts,
129-
"visit_end_time": session.visit_end_time,
130+
"visit_end_time": str(session.visit_end_time),
130131
},
131132
headers={
132133
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
@@ -396,6 +397,36 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db):
396397
return data
397398

398399

400+
@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time")
401+
async def update_visit_end_time(
402+
session_id: MurfeySessionID, end_time: datetime.datetime, db=murfey_db
403+
):
404+
# Load data for session
405+
session_entry = db.exec(select(Session).where(Session.id == session_id)).one()
406+
instrument_name = session_entry.instrument_name
407+
408+
# Update visit end time in database
409+
session_entry.visit_end_time = end_time
410+
db.add(session_entry)
411+
db.commit()
412+
413+
# Update the multigrid controller
414+
data = {}
415+
machine_config = get_machine_config(instrument_name=instrument_name)[
416+
instrument_name
417+
]
418+
if machine_config.instrument_server_url:
419+
async with aiohttp.ClientSession() as clientsession:
420+
async with clientsession.post(
421+
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}",
422+
headers={
423+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
424+
},
425+
) as resp:
426+
data = await resp.json()
427+
return data
428+
429+
399430
@router.post("/sessions/{session_id}/abandon_session")
400431
async def abandon_session(session_id: MurfeySessionID, db=murfey_db):
401432
data = {}
@@ -473,6 +504,34 @@ async def restart_rsyncer(
473504
return data
474505

475506

507+
@router.post("/sessions/{session_id}/flush_skipped_rsyncer")
508+
async def flush_skipped_rsyncer(
509+
session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db
510+
):
511+
data = {}
512+
instrument_name = (
513+
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
514+
)
515+
machine_config = get_machine_config(instrument_name=instrument_name)[
516+
instrument_name
517+
]
518+
if isinstance(session_id, int):
519+
if machine_config.instrument_server_url:
520+
async with aiohttp.ClientSession() as clientsession:
521+
async with clientsession.post(
522+
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}",
523+
json={
524+
"label": session_id,
525+
"source": str(secure_path(Path(rsyncer_source.source))),
526+
},
527+
headers={
528+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
529+
},
530+
) as resp:
531+
data = await resp.json()
532+
return data
533+
534+
476535
class RSyncerInfo(BaseModel):
477536
source: str
478537
num_files_transferred: int

src/murfey/util/route_manifest.yaml

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ murfey.instrument_server.api.router:
3838
type: str
3939
methods:
4040
- DELETE
41+
- path: /sessions/{session_id}/multigrid_controller/visit_end_time
42+
function: update_multigrid_controller_visit_end_time
43+
path_params: []
44+
methods:
45+
- POST
4146
- path: /sessions/{session_id}/stop_rsyncer
4247
function: stop_rsyncer
4348
path_params: []
@@ -68,6 +73,11 @@ murfey.instrument_server.api.router:
6873
path_params: []
6974
methods:
7075
- POST
76+
- path: /sessions/{session_id}/flush_skipped_rsyncer
77+
function: flush_skipped_rsyncer
78+
path_params: []
79+
methods:
80+
- POST
7181
- path: /sessions/{session_id}/rsyncer_info
7282
function: get_rsyncer_info
7383
path_params: []
@@ -535,6 +545,11 @@ murfey.server.api.instrument.router:
535545
path_params: []
536546
methods:
537547
- POST
548+
- path: /instrument_server/sessions/{session_id}/multigrid_controller/visit_end_time
549+
function: update_visit_end_time
550+
path_params: []
551+
methods:
552+
- POST
538553
- path: /instrument_server/sessions/{session_id}/abandon_session
539554
function: abandon_session
540555
path_params: []
@@ -550,6 +565,11 @@ murfey.server.api.instrument.router:
550565
path_params: []
551566
methods:
552567
- POST
568+
- path: /instrument_server/sessions/{session_id}/flush_skipped_rsyncer
569+
function: flush_skipped_rsyncer
570+
path_params: []
571+
methods:
572+
- POST
553573
- path: /instrument_server/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info
554574
function: get_rsyncer_info
555575
path_params:
@@ -821,11 +841,6 @@ murfey.server.api.session_info.correlative_router:
821841
methods:
822842
- GET
823843
murfey.server.api.session_info.router:
824-
- path: /session_info/
825-
function: root
826-
path_params: []
827-
methods:
828-
- GET
829844
- path: /session_info/health/
830845
function: health_check
831846
path_params: []

0 commit comments

Comments
 (0)