Skip to content

Commit dda6fe8

Browse files
committed
Added FastAPI endpoints to instrument and backend servers to request for general file downloads from different upstream instruments
1 parent b50bb35 commit dda6fe8

File tree

6 files changed

+324
-6
lines changed

6 files changed

+324
-6
lines changed

src/murfey/instrument_server/api.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,13 +469,83 @@ def upload_gain_reference(
469469
return {"success": True}
470470

471471

472-
class UpstreamTiffInfo(BaseModel):
472+
class UpstreamFileDownloadInfo(BaseModel):
473+
download_dir: Path
474+
upstream_instrument: str
475+
upstream_visit_path: Path
476+
477+
478+
@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request")
479+
def gather_upstream_files(
480+
visit_name: str,
481+
session_id: MurfeySessionID,
482+
upstream_file_download: UpstreamFileDownloadInfo,
483+
):
484+
"""
485+
Instrument server endpoint that will query the backend for files in the chosen
486+
visit directory
487+
"""
488+
# Check for forbidden characters
489+
if any(c in visit_name for c in ("/", "\\", ":", ";")):
490+
logger.error(f"Forbidden characters are present in the visit name {visit_name}")
491+
return {
492+
"succss": False,
493+
"detail": "Forbidden characters present in visit name",
494+
}
495+
# Get the list of files to download
496+
murfey_url = urlparse(_get_murfey_url(), allow_fragments=False)
497+
sanitised_visit_name = sanitise_nonpath(visit_name)
498+
url_path = url_path_for(
499+
"session_control.correlative_router",
500+
"gather_upstream_files",
501+
session_id=session_id,
502+
visit_name=sanitised_visit_name,
503+
)
504+
upstream_files: list[str] = requests.get(
505+
f"{murfey_url.geturl()}{url_path}",
506+
headers={"Authorization": f"Bearer {tokens[session_id]}"},
507+
json={
508+
"upstream_instrument": upstream_file_download.upstream_instrument,
509+
"upstream_visit_path": str(upstream_file_download.upstream_visit_path),
510+
},
511+
).json()
512+
513+
# Make the download directory and download gathered files
514+
upstream_file_download.download_dir.mkdir(exist_ok=True)
515+
for upstream_file in upstream_files:
516+
url_path = url_path_for(
517+
"session_control.correlative_router",
518+
"get_upstream_file",
519+
session_id=session_id,
520+
visit_name=sanitised_visit_name,
521+
upstream_file_path=upstream_file,
522+
)
523+
file_data = requests.get(
524+
f"{murfey_url.geturl()}{url_path}",
525+
headers={"Authorization": f"Bearer {tokens[session_id]}"},
526+
stream=True,
527+
)
528+
upstream_file_relative_path = Path(upstream_file).relative_to(
529+
upstream_file_download.upstream_visit_path
530+
)
531+
save_file = upstream_file_download.download_dir / upstream_file_relative_path
532+
save_file.parent.mkdir(parents=True, exist_ok=True)
533+
with open(save_file, "wb") as f:
534+
for chunk in file_data.iter_content(chunk_size=32 * 1024**2):
535+
f.write(chunk)
536+
logger.info(f"Saved file to {str(save_file)!r}")
537+
return {"success": True}
538+
539+
540+
class UpstreamTiffDownloadInfo(BaseModel):
473541
download_dir: Path
474542

475543

476544
@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request")
477545
def gather_upstream_tiffs(
478-
visit_name: str, session_id: MurfeySessionID, upstream_tiff_info: UpstreamTiffInfo
546+
visit_name: str,
547+
session_id: MurfeySessionID,
548+
upstream_tiff_info: UpstreamTiffDownloadInfo,
479549
):
480550
sanitised_visit_name = sanitise_nonpath(visit_name)
481551
assert not any(c in visit_name for c in ("/", "\\", ":", ";"))

src/murfey/server/api/instrument.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,69 @@ async def request_upstream_tiff_data_download(
396396
return data
397397

398398

399+
class UpstreamFileRequestInfo(BaseModel):
400+
upstream_instrument: str
401+
upstream_visit_path: Path
402+
403+
404+
@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request")
405+
async def request_upstream_file_data_download(
406+
visit_name: str,
407+
session_id: MurfeySessionID,
408+
upstream_file_request: UpstreamFileRequestInfo,
409+
db=murfey_db,
410+
):
411+
"""
412+
Forwards a request to the instrument server to trigger a file download request.
413+
"""
414+
# Load the current instrument's machine config
415+
instrument_name = (
416+
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
417+
)
418+
machine_config = get_machine_config(instrument_name=instrument_name)[
419+
instrument_name
420+
]
421+
422+
# Log and return errors if download directory or URL weren't specified
423+
if not machine_config.upstream_data_download_directory:
424+
log.error("No download directory was configured for this instrument")
425+
return {
426+
"success": False,
427+
"detail": "No download directory configured",
428+
}
429+
if not machine_config.instrument_server_url:
430+
log.error("Couldn't find instrument server URL to post request to")
431+
return {
432+
"success": False,
433+
"detail": "No instrument server URL",
434+
}
435+
436+
# Forward the download request
437+
download_dir = str(
438+
machine_config.upstream_data_download_directory / secure_filename(visit_name)
439+
)
440+
async with aiohttp.ClientSession() as clientsession:
441+
url_path = url_path_for(
442+
"api.router",
443+
"gather_upstream_files",
444+
visit_name=secure_filename(visit_name),
445+
session_id=session_id,
446+
)
447+
async with clientsession.post(
448+
f"{machine_config.instrument_server_url}{url_path}",
449+
headers={
450+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
451+
},
452+
json={
453+
"download_dir": download_dir,
454+
"upstream_instrument": upstream_file_request.upstream_instrument,
455+
"upstream_visit_path": str(upstream_file_request.upstream_visit_path),
456+
},
457+
) as resp:
458+
data = await resp.json()
459+
return data
460+
461+
399462
class RsyncerSource(BaseModel):
400463
source: str
401464

src/murfey/server/api/session_control.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
)
1919
from murfey.server.api.shared import (
2020
find_upstream_visits as _find_upstream_visits,
21+
gather_upstream_files as _gather_upstream_files,
2122
gather_upstream_tiffs as _gather_upstream_tiffs,
2223
get_foil_hole as _get_foil_hole,
2324
get_foil_holes_from_grid_square as _get_foil_holes_from_grid_square,
2425
get_grid_squares as _get_grid_squares,
2526
get_grid_squares_from_dcg as _get_grid_squares_from_dcg,
2627
get_machine_config_for_instrument,
2728
get_tiff_file as _get_tiff_file,
29+
get_upstream_file as _get_upstream_file,
2830
remove_session_by_id,
2931
)
3032
from murfey.server.ispyb import DB as ispyb_db, get_all_ongoing_visits
@@ -422,6 +424,43 @@ async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db):
422424
return _find_upstream_visits(session_id=session_id, db=db)
423425

424426

427+
class UpstreamFileGatheringInfo(BaseModel):
428+
upstream_instrument: str
429+
upstream_visit_path: Path
430+
431+
432+
@correlative_router.get(
433+
"/visits/{visit_name}/sessions/{session_id}/upstream_file_paths"
434+
)
435+
async def gather_upstream_files(
436+
visit_name: str,
437+
session_id: MurfeySessionID,
438+
upstream_file_gathering: UpstreamFileGatheringInfo,
439+
db=murfey_db,
440+
):
441+
return _gather_upstream_files(
442+
session_id=session_id,
443+
upstream_instrument=upstream_file_gathering.upstream_instrument,
444+
upstream_visit_path=upstream_file_gathering.upstream_visit_path,
445+
db=db,
446+
)
447+
448+
449+
@correlative_router.get(
450+
"/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path}"
451+
)
452+
async def get_upstream_file(
453+
visit_name: str,
454+
session_id: MurfeySessionID,
455+
upstream_file_path: str,
456+
db=murfey_db,
457+
):
458+
upstream_file = _get_upstream_file(upstream_file_path)
459+
return (
460+
FileResponse(path=upstream_file) if upstream_file is not None else upstream_file
461+
)
462+
463+
425464
@correlative_router.get(
426465
"/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths"
427466
)
@@ -436,4 +475,4 @@ async def get_tiff_file(visit_name: str, session_id: int, tiff_path: str, db=mur
436475
tiff_file = _get_tiff_file(
437476
visit_name=visit_name, session_id=session_id, tiff_path=tiff_path, db=db
438477
)
439-
return FileResponse(path=tiff_file) if isinstance(tiff_file, Path) else tiff_file
478+
return FileResponse(path=tiff_file) if tiff_file is not None else tiff_file

src/murfey/server/api/session_info.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
)
1919
from murfey.server.api.shared import (
2020
find_upstream_visits as _find_upstream_visits,
21+
gather_upstream_files as _gather_upstream_files,
2122
gather_upstream_tiffs as _gather_upstream_tiffs,
2223
get_foil_hole as _get_foil_hole,
2324
get_foil_holes_from_grid_square as _get_foil_holes_from_grid_square,
2425
get_grid_squares as _get_grid_squares,
2526
get_grid_squares_from_dcg as _get_grid_squares_from_dcg,
2627
get_machine_config_for_instrument,
2728
get_tiff_file as _get_tiff_file,
29+
get_upstream_file as _get_upstream_file,
2830
remove_session_by_id,
2931
)
3032
from murfey.server.ispyb import DB as ispyb_db, get_all_ongoing_visits
@@ -416,6 +418,43 @@ async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db):
416418
return _find_upstream_visits(session_id=session_id, db=db)
417419

418420

421+
class UpstreamFileGatheringInfo(BaseModel):
422+
upstream_instrument: str
423+
upstream_visit_path: Path
424+
425+
426+
@correlative_router.get(
427+
"/visits/{visit_name}/sessions/{session_id}/upstream_file_paths"
428+
)
429+
async def gather_upstream_files(
430+
visit_name: str,
431+
session_id: MurfeySessionID,
432+
upstream_file_gathering: UpstreamFileGatheringInfo,
433+
db=murfey_db,
434+
):
435+
return _gather_upstream_files(
436+
session_id=session_id,
437+
upstream_instrument=upstream_file_gathering.upstream_instrument,
438+
upstream_visit_path=upstream_file_gathering.upstream_visit_path,
439+
db=db,
440+
)
441+
442+
443+
@correlative_router.get(
444+
"/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path}"
445+
)
446+
async def get_upstream_file(
447+
visit_name: str,
448+
session_id: MurfeySessionID,
449+
upstream_file_path: Path,
450+
db=murfey_db,
451+
):
452+
upstream_file = _get_upstream_file(upstream_file_path)
453+
return (
454+
FileResponse(path=upstream_file) if upstream_file is not None else upstream_file
455+
)
456+
457+
419458
@correlative_router.get(
420459
"/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths"
421460
)

src/murfey/server/api/shared.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,62 @@ def find_upstream_visits(session_id: int, db: SQLModelSession):
168168
# Looks for visit name in file path
169169
current_upstream_visits = {}
170170
for visit_path in Path(upstream_data_dir).glob(f"{visit_name.split('-')[0]}-*"):
171-
current_upstream_visits[visit_path.name] = (
172-
visit_path / machine_config.processed_directory_name
173-
)
171+
current_upstream_visits[visit_path.name] = visit_path
174172
upstream_visits[upstream_instrument] = current_upstream_visits
175173
return upstream_visits
176174

177175

176+
def gather_upstream_files(
177+
session_id: int,
178+
upstream_instrument: str,
179+
upstream_visit_path: Path,
180+
db: SQLModelSession,
181+
):
182+
"""
183+
Searches the specified upstream instrument for files based on the search strings
184+
set in the MachineConfig and retursn them as a list of file paths.
185+
"""
186+
# Load the current instrument's machine config
187+
murfey_session = db.exec(
188+
select(MurfeySession).where(MurfeySession.id == session_id)
189+
).one()
190+
instrument_name = murfey_session.instrument_name
191+
machine_config = get_machine_config(instrument_name=instrument_name)[
192+
instrument_name
193+
]
194+
195+
# Search for files using the configured strings for that upstream instrument
196+
file_list: list[Path] = []
197+
logger.info(f"Searching for files in {upstream_visit_path}")
198+
if (
199+
machine_config.upstream_data_search_strings.get(upstream_instrument, None)
200+
is not None
201+
):
202+
for search_string in machine_config.upstream_data_search_strings[
203+
upstream_instrument
204+
]:
205+
logger.info(f"Using search string {search_string}")
206+
for file in upstream_visit_path.glob(search_string):
207+
if file.is_file():
208+
file_list.append(file)
209+
logger.info(
210+
f"Found {len(file_list)} files for download from {upstream_instrument}"
211+
)
212+
else:
213+
logger.warning(
214+
f"Upstream file searching has not been configured for {upstream_instrument} on {instrument_name}"
215+
)
216+
return file_list
217+
218+
219+
def get_upstream_file(file_path: str | Path):
220+
file_path = Path(file_path) if isinstance(file_path, str) else file_path
221+
if file_path.exists() and file_path.is_file():
222+
return file_path
223+
logger.warning(f"Requested file {str(file_path)!r} was not found")
224+
return None
225+
226+
178227
def get_upstream_tiff_dirs(visit_name: str, instrument_name: str) -> List[Path]:
179228
tiff_dirs = []
180229
machine_config = get_machine_config(instrument_name=instrument_name)[

0 commit comments

Comments
 (0)