Skip to content

Commit afbc024

Browse files
committed
Endpoints to flush skipped files for an rsyncer
1 parent d775661 commit afbc024

File tree

4 files changed

+45
-0
lines changed

4 files changed

+45
-0
lines changed

src/murfey/client/rsync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ def enqueue(self, file_path: Path):
222222
def flush_skipped(self):
223223
for f in self._skipped_files:
224224
self.queue.put(f)
225+
self._skipped_files = []
225226

226227
def _process(self):
227228
logger.info("RSync thread starting")

src/murfey/instrument_server/api.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,12 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
268268
return {"success": True}
269269

270270

271+
@router.post("/sessions/{session_id}/flush_skipped_rsyncer")
272+
def flush_skipped_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
273+
controllers[session_id].rsync_processes[rsyncer_source.source].flush_skipped()
274+
return {"success": True}
275+
276+
271277
class ObserverInfo(BaseModel):
272278
source: str
273279
num_files_transferred: int

src/murfey/server/api/instrument.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,34 @@ async def restart_rsyncer(
495495
return data
496496

497497

498+
@router.post("/sessions/{session_id}/flush_skipped_rsyncer")
499+
async def flush_skipped_rsyncer(
500+
session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db
501+
):
502+
data = {}
503+
instrument_name = (
504+
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
505+
)
506+
machine_config = get_machine_config(instrument_name=instrument_name)[
507+
instrument_name
508+
]
509+
if isinstance(session_id, int):
510+
if machine_config.instrument_server_url:
511+
async with aiohttp.ClientSession() as clientsession:
512+
async with clientsession.post(
513+
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}",
514+
json={
515+
"label": session_id,
516+
"source": str(secure_path(Path(rsyncer_source.source))),
517+
},
518+
headers={
519+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
520+
},
521+
) as resp:
522+
data = await resp.json()
523+
return data
524+
525+
498526
class RSyncerInfo(BaseModel):
499527
source: str
500528
num_files_transferred: int

src/murfey/util/route_manifest.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ murfey.instrument_server.api.router:
7373
path_params: []
7474
methods:
7575
- POST
76+
- path: /sessions/{session_id}/flush_skipped_rsyncer
77+
function: flush_skipped_rsyncer
78+
path_params: []
79+
methods:
80+
- POST
7681
- path: /sessions/{session_id}/rsyncer_info
7782
function: get_rsyncer_info
7883
path_params: []
@@ -560,6 +565,11 @@ murfey.server.api.instrument.router:
560565
path_params: []
561566
methods:
562567
- POST
568+
- path: /instrument_server/sessions/{session_id}/flush_skipped_rsyncer
569+
function: flush_skipped_rsyncer
570+
path_params: []
571+
methods:
572+
- POST
563573
- path: /instrument_server/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info
564574
function: get_rsyncer_info
565575
path_params:

0 commit comments

Comments
 (0)