Skip to content

Commit 010f227

Browse files
authored
Allow RSync instances to be given an end time (#591)
They will maintain a list of files created after this time rather than transfer them. The intention is for this end time to be specified from the front end on session setup.
1 parent abcefc7 commit 010f227

File tree

7 files changed

+66
-8
lines changed

7 files changed

+66
-8
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class MultigridController:
4848
data_collection_parameters: dict = field(default_factory=lambda: {})
4949
token: str = ""
5050
_machine_config: dict = field(default_factory=lambda: {})
51+
visit_end_time: Optional[datetime] = None
5152

5253
def __post_init__(self):
5354
if self.token:
@@ -99,6 +100,15 @@ def __post_init__(self):
99100
register_client=False,
100101
)
101102

103+
if self.visit_end_time:
104+
current_time = datetime.now()
105+
server_timestamp = requests.get(f"{self.murfey_url}/time").json()[
106+
"timestamp"
107+
]
108+
self.visit_end_time += current_time - datetime.fromtimestamp(
109+
server_timestamp
110+
)
111+
102112
def _multigrid_watcher_finalised(self):
103113
self.multigrid_watcher_active = False
104114
self.dormancy_check()
@@ -277,6 +287,7 @@ def _start_rsyncer(
277287
stop_callback=self._rsyncer_stopped,
278288
do_transfer=self.do_transfer,
279289
remove_files=remove_files,
290+
end_time=self.visit_end_time,
280291
)
281292

282293
def rsync_result(update: RSyncerUpdate):

src/murfey/client/rsync.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import subprocess
1414
import threading
1515
import time
16+
from datetime import datetime
1617
from enum import Enum
1718
from pathlib import Path
1819
from typing import Awaitable, Callable, List, NamedTuple
@@ -63,6 +64,7 @@ def __init__(
6364
remove_files: bool = False,
6465
required_substrings_for_removal: List[str] = [],
6566
notify: bool = True,
67+
end_time: datetime | None = None,
6668
):
6769
super().__init__()
6870
self._basepath = basepath_local.absolute()
@@ -76,6 +78,9 @@ def __init__(
7678
self._server_url = server_url
7779
self._notify = notify
7880
self._finalised = False
81+
self._end_time = end_time
82+
83+
self._skipped_files: List[Path] = []
7984

8085
# Set rsync destination
8186
if local:
@@ -214,6 +219,10 @@ def enqueue(self, file_path: Path):
214219
absolute_path = self._basepath / file_path
215220
self.queue.put(absolute_path)
216221

222+
def flush_skipped(self):
223+
for f in self._skipped_files:
224+
self.queue.put(f)
225+
217226
def _process(self):
218227
logger.info("RSync thread starting")
219228
files_to_transfer: list[Path]
@@ -304,14 +313,23 @@ def _fake_transfer(self, files: list[Path]) -> bool:
304313

305314
return True
306315

307-
def _transfer(self, files: list[Path]) -> bool:
316+
def _transfer(self, infiles: list[Path]) -> bool:
308317
"""
309318
Transfer files via an rsync sub-process, and parses the rsync stdout to verify
310319
the success of the transfer.
311320
"""
312321

313322
# Set up initial variables
314-
files = [f for f in files if f.is_file()]
323+
if self._end_time:
324+
files = [
325+
f
326+
for f in infiles
327+
if f.is_file() and f.stat().st_ctime < self._end_time.timestamp()
328+
]
329+
self._skipped_files.extend(set(infiles).difference(set(files)))
330+
else:
331+
files = [f for f in infiles if f.is_file()]
332+
315333
previously_transferred = self._files_transferred
316334
transfer_success: set[Path] = set()
317335
successful_updates: list[RSyncerUpdate] = []

src/murfey/instrument_server/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ def setup_multigrid_watcher(
160160
token=tokens.get(session_id, "token"),
161161
data_collection_parameters=data_collection_parameters.get(label, {}),
162162
rsync_restarts=watcher_spec.rsync_restarts,
163+
visit_end_time=watcher_spec.visit_end_time,
163164
)
164165
watcher_spec.source.mkdir(exist_ok=True)
165166
machine_config = requests.get(
@@ -251,6 +252,7 @@ class ObserverInfo(BaseModel):
251252
num_files_in_queue: int
252253
alive: bool
253254
stopping: bool
255+
num_files_skipped: int = 0
254256

255257

256258
@router.get("/sessions/{session_id}/rsyncer_info")
@@ -264,6 +266,7 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]:
264266
num_files_in_queue=v.queue.qsize(),
265267
alive=v.thread.is_alive(),
266268
stopping=v._stopping,
269+
num_files_skipped=len(v._skipped_files),
267270
)
268271
)
269272
return info

src/murfey/server/api/__init__.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ async def root(request: Request):
128128
)
129129

130130

131+
@router.get("/time")
132+
async def get_current_timestamp():
133+
return {"timestamp": datetime.datetime.now().timestamp()}
134+
135+
131136
@router.get("/health/")
132137
def health_check(db=murfey.server.ispyb.DB):
133138
conn = db.connection()
@@ -1967,9 +1972,24 @@ async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_d
19671972
return FileResponse(path=test_path)
19681973

19691974

1975+
class VisitEndTime(BaseModel):
1976+
end_time: Optional[datetime.datetime] = None
1977+
1978+
19701979
@router.post("/instruments/{instrument_name}/visits/{visit}/session/{name}")
1971-
def create_session(instrument_name: str, visit: str, name: str, db=murfey_db) -> int:
1972-
s = Session(name=name, visit=visit, instrument_name=instrument_name)
1980+
def create_session(
1981+
instrument_name: str,
1982+
visit: str,
1983+
name: str,
1984+
visit_end_time: VisitEndTime,
1985+
db=murfey_db,
1986+
) -> int:
1987+
s = Session(
1988+
name=name,
1989+
visit=visit,
1990+
instrument_name=instrument_name,
1991+
visit_end_time=visit_end_time.end_time,
1992+
)
19731993
db.add(s)
19741994
db.commit()
19751995
sid = s.id

src/murfey/server/api/instrument.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,8 @@ async def setup_multigrid_watcher(
9898
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSetup, db=murfey_db
9999
):
100100
data = {}
101-
instrument_name = (
102-
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
103-
)
101+
session = db.exec(select(Session).where(Session.id == session_id)).one()
102+
instrument_name = session.instrument_name
104103
machine_config = get_machine_config(instrument_name=instrument_name)[
105104
instrument_name
106105
]
@@ -130,6 +129,7 @@ async def setup_multigrid_watcher(
130129
str(k): v for k, v in watcher_spec.destination_overrides.items()
131130
},
132131
"rsync_restarts": watcher_spec.rsync_restarts,
132+
"visit_end_time": session.visit_end_time,
133133
},
134134
headers={
135135
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
@@ -487,6 +487,7 @@ class RSyncerInfo(BaseModel):
487487
files_counted: int
488488
transferring: bool
489489
session_id: int
490+
num_files_skipped: int = 0
490491

491492

492493
@router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info")
@@ -564,6 +565,7 @@ async def get_rsyncer_info(
564565
files_counted=ri.files_counted,
565566
transferring=ri.transferring,
566567
session_id=session_id,
568+
num_files_skipped=rsync_data.get("num_files_skipped", 0),
567569
)
568570
)
569571
return combined_data

src/murfey/util/db.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
of the sessions that Murfey is overseeing, along with the relationships between them.
44
"""
55

6+
from datetime import datetime
67
from typing import List, Optional
78

89
import sqlalchemy
@@ -48,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore
4849
started: bool = Field(default=False)
4950
current_gain_ref: str = Field(default="")
5051
instrument_name: str = Field(default="")
52+
visit_end_time: Optional[datetime] = Field(default=None)
5153

5254
# CLEM Workflow
5355

src/murfey/util/instrument_models.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
from datetime import datetime
12
from pathlib import Path
2-
from typing import Dict, List
3+
from typing import Dict, List, Optional
34

45
from pydantic import BaseModel
56

@@ -15,3 +16,4 @@ class MultigridWatcherSpec(BaseModel):
1516
skip_existing_processing: bool = False
1617
destination_overrides: Dict[Path, str] = {}
1718
rsync_restarts: List[str] = []
19+
visit_end_time: Optional[datetime] = None

0 commit comments

Comments
 (0)