Skip to content

Commit 3fbe240

Browse files
authored
Add endpoint which can gather extra rsyncer information (#545)
Including how many files are queued for transfer and whether the rsync thread is alive
1 parent 9df4d4e commit 3fbe240

File tree

2 files changed

+89
-1
lines changed

2 files changed

+89
-1
lines changed

src/murfey/instrument_server/api.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,30 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
219219
return {"success": True}
220220

221221

222+
class RSyncerInfo(BaseModel):
223+
source: str
224+
num_files_transferred: int
225+
num_files_in_queue: int
226+
alive: bool
227+
stopping: bool
228+
229+
230+
@router.get("/sessions/{session_id}/rsyncer_info")
231+
def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]:
232+
info = []
233+
for k, v in controllers[session_id].rsync_processes.items():
234+
info.append(
235+
RSyncerInfo(
236+
source=str(k),
237+
num_files_transferred=v._files_transferred,
238+
num_files_in_queue=v.queue.qsize(),
239+
alive=v.thread.is_alive(),
240+
stopping=v._stopping,
241+
)
242+
)
243+
return info
244+
245+
222246
class ProcessingParameters(BaseModel):
223247
gain_ref: str
224248
dose_per_frame: Optional[float] = None

src/murfey/server/api/instrument.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from murfey.server.murfey_db import murfey_db
2424
from murfey.util import secure_path
2525
from murfey.util.config import get_machine_config
26-
from murfey.util.db import Session, SessionProcessingParameters
26+
from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters
2727
from murfey.util.models import File, MultigridWatcherSetup
2828

2929
# Create APIRouter class object
@@ -407,3 +407,67 @@ async def restart_rsyncer(
407407
) as resp:
408408
data = await resp.json()
409409
return data
410+
411+
412+
class RSyncerInfo(BaseModel):
413+
source: str
414+
num_files_transferred: int
415+
num_files_in_queue: int
416+
alive: bool
417+
stopping: bool
418+
destination: str
419+
tag: str
420+
files_transferred: int
421+
files_counted: int
422+
transferring: bool
423+
session_id: int
424+
425+
426+
@router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info")
427+
async def get_rsyncer_info(
428+
instrument_name: str, session_id: MurfeySessionID, db=murfey_db
429+
) -> List[RSyncerInfo]:
430+
data = []
431+
machine_config = get_machine_config(instrument_name=instrument_name)[
432+
instrument_name
433+
]
434+
rsync_instances = db.exec(
435+
select(RsyncInstance).where(RsyncInstance.session_id == session_id)
436+
).all()
437+
if machine_config.instrument_server_url:
438+
try:
439+
async with lock:
440+
token = instrument_server_tokens[session_id]["access_token"]
441+
async with aiohttp.ClientSession() as clientsession:
442+
async with clientsession.get(
443+
f"{machine_config.instrument_server_url}/sessions/{session_id}/rsyncer_info",
444+
headers={"Authorization": f"Bearer {token}"},
445+
) as resp:
446+
data = await resp.json()
447+
except KeyError:
448+
data = []
449+
except Exception:
450+
log.warning(
451+
"Exception encountered gathering rsyncer info from the instrument server",
452+
exc_info=True,
453+
)
454+
combined_data = []
455+
data_source_lookup = {d["source"]: d for d in data}
456+
for ri in rsync_instances:
457+
d = data_source_lookup.get(ri.source, {})
458+
combined_data.append(
459+
RSyncerInfo(
460+
source=ri.source,
461+
num_files_transferred=d.get("num_files_transferred", 0),
462+
num_files_in_queue=d.get("num_files_in_queue", 0),
463+
alive=d.get("alive", False),
464+
stopping=d.get("stopping", True),
465+
destination=ri.destination,
466+
tag=ri.tag,
467+
files_transferred=ri.files_transferred,
468+
files_counted=ri.files_counted,
469+
transferring=ri.transferring,
470+
session_id=session_id,
471+
)
472+
)
473+
return combined_data

0 commit comments

Comments
 (0)