Skip to content

Commit 2602b0a

Browse files
authored
Fixed flushing of skipped files (#621)
* Removed 'label' field from the 'RsyncerSource' Pydantic models, and removed the key from requests to endpoints that use that model * Only display error message about no rsync processes being run if files were scheduled for transfer; this should skip the case of an empty list * Update visit end time information in rsyncer and database when triggering a flush, but only if the end time is greater than what is currently stored in the database * The 'end_time' attribute of the RSyncer is modiifed by the instrument server endpoint, and not as part of the flush * Stored the time offset between the server and the client as an attribute in the MultigridController; convert the received server timestamp into the client's time when updating visit end times * Added a return message to the 'stop_multigrid_watcher' and 'update_multigrid_controller_visit_end_time' endpoints in the instrument server
1 parent 4d783a7 commit 2602b0a

File tree

4 files changed

+50
-23
lines changed

4 files changed

+50
-23
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,18 @@ def __post_init__(self):
104104
register_client=False,
105105
)
106106

107+
# Calculate the time offset between the client and the server
108+
current_time = datetime.now()
109+
server_timestamp = requests.get(
110+
f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}"
111+
).json()["timestamp"]
112+
self.server_time_offset = current_time - datetime.fromtimestamp(
113+
server_timestamp
114+
)
115+
116+
# Store the visit end time in the current device's equivalent time
107117
if self.visit_end_time:
108-
current_time = datetime.now()
109-
server_timestamp = requests.get(
110-
f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}"
111-
).json()["timestamp"]
112-
self.visit_end_time += current_time - datetime.fromtimestamp(
113-
server_timestamp
114-
)
118+
self.visit_end_time += self.server_time_offset
115119

116120
def _multigrid_watcher_finalised(self):
117121
self.multigrid_watcher_active = False
@@ -153,9 +157,10 @@ def finalise(self):
153157
self._finalise_rsyncer(p)
154158

155159
def update_visit_time(self, new_end_time: datetime):
156-
self.visit_end_time = new_end_time
160+
# Convert the received server timestamp into the local equivalent
161+
self.visit_end_time = new_end_time + self.server_time_offset
157162
for rp in self.rsync_processes.values():
158-
rp._end_time = new_end_time
163+
rp._end_time = self.visit_end_time
159164

160165
def _start_rsyncer_multigrid(
161166
self,

src/murfey/client/rsync.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ def enqueue(self, file_path: Path):
220220
self.queue.put(absolute_path)
221221

222222
def flush_skipped(self):
223-
self._end_time = datetime.now()
224223
for f in self._skipped_files:
225224
self.queue.put(f)
226225
self._skipped_files = []
@@ -561,7 +560,9 @@ def parse_stderr(line: str):
561560
success = False
562561

563562
if result is None:
564-
logger.error(f"No rsync process ran for files: {files}")
563+
# Only log this as an error if files were scheduled for transfer
564+
if files:
565+
logger.error(f"No rsync process ran for files: {files}")
565566
else:
566567
logger.log(
567568
logging.WARNING if result.returncode else logging.DEBUG,

src/murfey/instrument_server/api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,18 +212,19 @@ def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True):
212212
@router.delete("/sessions/{session_id}/multigrid_watcher/{label}")
213213
def stop_multigrid_watcher(session_id: MurfeySessionID, label: str):
214214
watchers[label].request_stop()
215+
return {"success": True}
215216

216217

217218
@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time")
218219
def update_multigrid_controller_visit_end_time(
219220
session_id: MurfeySessionID, end_time: datetime
220221
):
221222
controllers[session_id].update_visit_time(end_time)
223+
return {"success": True}
222224

223225

224226
class RsyncerSource(BaseModel):
225227
source: Path
226-
label: str
227228

228229

229230
@router.post("/sessions/{session_id}/stop_rsyncer")

src/murfey/server/api/instrument.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import asyncio
44
import datetime
55
import logging
6-
import urllib
76
from pathlib import Path
87
from typing import Annotated, List, Optional
8+
from urllib.parse import quote
99

1010
import aiohttp
1111
from fastapi import APIRouter, Depends
@@ -340,7 +340,6 @@ async def stop_rsyncer(
340340
async with clientsession.post(
341341
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'stop_rsyncer', session_id=session_id)}",
342342
json={
343-
"label": session_id,
344343
"source": str(secure_path(Path(rsyncer_source.source))),
345344
},
346345
headers={
@@ -367,7 +366,6 @@ async def finalise_rsyncer(
367366
async with clientsession.post(
368367
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'finalise_rsyncer', session_id=session_id)}",
369368
json={
370-
"label": session_id,
371369
"source": str(secure_path(Path(rsyncer_source.source))),
372370
},
373371
headers={
@@ -420,7 +418,7 @@ async def update_visit_end_time(
420418
if machine_config.instrument_server_url:
421419
async with aiohttp.ClientSession() as clientsession:
422420
async with clientsession.post(
423-
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}",
421+
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(end_time.isoformat())}",
424422
headers={
425423
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
426424
},
@@ -467,7 +465,6 @@ async def remove_rsyncer(
467465
async with clientsession.post(
468466
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'remove_rsyncer', session_id=session_id)}",
469467
json={
470-
"label": session_id,
471468
"source": str(secure_path(Path(rsyncer_source.source))),
472469
},
473470
headers={
@@ -495,7 +492,6 @@ async def restart_rsyncer(
495492
async with clientsession.post(
496493
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'restart_rsyncer', session_id=session_id)}",
497494
json={
498-
"label": session_id,
499495
"source": str(secure_path(Path(rsyncer_source.source))),
500496
},
501497
headers={
@@ -510,20 +506,44 @@ async def restart_rsyncer(
510506
async def flush_skipped_rsyncer(
511507
session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db
512508
):
513-
data = {}
514-
instrument_name = (
515-
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
516-
)
509+
# Load data for session
510+
session_entry = db.exec(select(Session).where(Session.id == session_id)).one()
511+
instrument_name = session_entry.instrument_name
512+
513+
# Define a new visit end time that's slightly ahead of current time
514+
new_end_time = datetime.datetime.now().replace(
515+
second=0, microsecond=0
516+
) + datetime.timedelta(minutes=5)
517+
# Update the stored visit end time if the new one exceeds it
518+
if session_entry.visit_end_time:
519+
if new_end_time > session_entry.visit_end_time:
520+
session_entry.visit_end_time = new_end_time
521+
db.add(session_entry)
522+
db.commit()
523+
524+
# Send request to flush rsyncer
525+
data: dict = {}
526+
update_result: dict = {}
517527
machine_config = get_machine_config(instrument_name=instrument_name)[
518528
instrument_name
519529
]
520530
if isinstance(session_id, int):
521531
if machine_config.instrument_server_url:
522532
async with aiohttp.ClientSession() as clientsession:
533+
# Send request to instrument server to update multigrid controller
534+
async with clientsession.post(
535+
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(session_entry.visit_end_time.isoformat())}",
536+
headers={
537+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
538+
},
539+
) as resp:
540+
update_result = await resp.json()
541+
if not update_result.get("success", False):
542+
return {"success": False}
543+
# Send request to flush the rsyncer
523544
async with clientsession.post(
524545
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}",
525546
json={
526-
"label": session_id,
527547
"source": str(secure_path(Path(rsyncer_source.source))),
528548
},
529549
headers={

0 commit comments

Comments
 (0)