Skip to content

Commit c9ffa31

Browse files
committed
Refactored functions to parse CLEM preprocessing results and started writing function to register results
1 parent ad9a489 commit c9ffa31

File tree

2 files changed

+297
-9
lines changed

2 files changed

+297
-9
lines changed

src/murfey/server/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
get_microscope,
5252
get_security_config,
5353
)
54+
from murfey.workflows.clem.register_results import (
55+
register_lif_preprocessing_result,
56+
register_tiff_preprocessing_result,
57+
)
5458

5559
try:
5660
from murfey.server.ispyb import TransportManager # Session
@@ -2945,16 +2949,14 @@ def feedback_callback(header: dict, message: dict) -> None:
29452949
_transport_object.transport.ack(header)
29462950
return None
29472951
elif message["register"] == "register_lif_preprocessing_result":
2948-
# Write a function to register received CLEM LIF processing results
2949-
# _register_lif_preprocessing_results(message)
2952+
register_lif_preprocessing_result(message, murfey_db)
29502953
if _transport_object:
29512954
_transport_object.transport.ack(header)
29522955
# When a message is received, it goes into unacked
29532956
# When it's acked, it gets removed from the queue
29542957
# When it's nacked, it eventually ends up in the DLQ
29552958
elif message["register"] == "register_tiff_preprocessing_result":
2956-
# Write a function to register received CLEM TIFF processing results
2957-
# _register_tiff_preprocessing_results(message0)
2959+
register_tiff_preprocessing_result(message, murfey_db)
29582960
if _transport_object:
29592961
_transport_object.transport.ack(header)
29602962
if _transport_object:

src/murfey/workflows/clem/register_results.py

Lines changed: 291 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,304 @@
55
the file registration processes all take place on the server side only.
66
"""
77

8+
from __future__ import annotations
89

9-
def register_lif_preprocessing_result(message: dict):
10+
import json
11+
import logging
12+
import re
13+
from pathlib import Path
14+
from typing import Optional, Type, Union
15+
16+
from sqlalchemy.exc import NoResultFound
17+
from sqlmodel import Session, select
18+
19+
from murfey.util.config import get_machine_config
20+
from murfey.util.db import (
21+
CLEMImageMetadata,
22+
CLEMImageSeries,
23+
CLEMImageStack,
24+
CLEMLIFFile,
25+
CLEMTIFFFile,
26+
)
27+
from murfey.util.db import Session as MurfeySession
28+
from murfey.util.models import LIFPreprocessingResult, TIFFPreprocessingResult
29+
30+
logger = logging.getLogger("murfey.workflows.clem.register_results")
31+
32+
33+
def _validate_and_sanitise(
34+
file: Path,
35+
session_id: int,
36+
db: Session,
37+
) -> Path:
38+
"""
39+
Performs validation and sanitisation on the incoming file paths, ensuring that
40+
no forbidden characters are present and that the the path points only to allowed
41+
sections of the file server.
42+
43+
Returns the file path as a sanitised string that can be converted into a Path
44+
object again.
45+
46+
NOTE: Due to the instrument name query, 'db' now needs to be passed as an
47+
explicit variable to this function from within a FastAPI endpoint, as using the
48+
instance that was imported directly won't load it in the correct state.
49+
"""
50+
51+
valid_file_types = (
52+
".lif",
53+
".tif",
54+
".tiff",
55+
".xlif",
56+
".xml",
57+
)
58+
59+
# Resolve symlinks and directory changes to get full file path
60+
full_path = Path(file).resolve()
61+
62+
# Use machine configuration to validate which file base paths are accepted from
63+
instrument_name = (
64+
db.exec(select(MurfeySession).where(MurfeySession.id == session_id))
65+
.one()
66+
.instrument_name
67+
)
68+
machine_config = get_machine_config(instrument_name=instrument_name)[
69+
instrument_name
70+
]
71+
rsync_basepath = machine_config.rsync_basepath
72+
try:
73+
base_path = list(rsync_basepath.parents)[-2].as_posix()
74+
except IndexError:
75+
# Print to troubleshoot
76+
logger.warning(f"Base path {rsync_basepath!r} is too short")
77+
base_path = rsync_basepath.as_posix()
78+
except Exception:
79+
raise Exception("Unexpected exception occurred when loading the file base path")
80+
81+
# Check that full file path doesn't contain unallowed characters
82+
# Currently allows only:
83+
# - words (alphanumerics and "_"; \w),
84+
# - spaces (\s),
85+
# - periods,
86+
# - dashes,
87+
# - forward slashes ("/")
88+
if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False:
89+
raise ValueError(f"Unallowed characters present in {file}")
90+
91+
# Check that it's not accessing somehwere it's not allowed
92+
if not str(full_path).startswith(str(base_path)):
93+
raise ValueError(f"{file} points to a directory that is not permitted")
94+
95+
# Check that it's a file, not a directory
96+
if full_path.is_file() is False:
97+
raise ValueError(f"{file} is not a file")
98+
99+
# Check that it is of a permitted file type
100+
if f"{full_path.suffix}" not in valid_file_types:
101+
raise ValueError(f"{full_path.suffix} is not a permitted file format")
102+
103+
return full_path
104+
105+
106+
def get_db_entry(
107+
db: Session,
108+
# With the database search funcion having been moved out of the FastAPI
109+
# endpoint, the database now has to be explicitly passed within the FastAPI
110+
# endpoint function in order for it to be loaded in the correct state.
111+
table: Type[
112+
Union[
113+
CLEMImageMetadata,
114+
CLEMImageSeries,
115+
CLEMImageStack,
116+
CLEMLIFFile,
117+
CLEMTIFFFile,
118+
]
119+
],
120+
session_id: int,
121+
file_path: Optional[Path] = None,
122+
series_name: Optional[str] = None,
123+
) -> Union[
124+
CLEMImageMetadata,
125+
CLEMImageSeries,
126+
CLEMImageStack,
127+
CLEMLIFFile,
128+
CLEMTIFFFile,
129+
]:
130+
"""
131+
Searches the CLEM workflow-related tables in the Murfey database for an entry that
132+
matches the file path or series name within a given session. Returns the entry if
133+
a match is found, otherwise register it as a new entry in the database.
134+
"""
135+
136+
# Validate that parameters are provided correctly
137+
if file_path is None and series_name is None:
138+
raise ValueError(
139+
"One of either 'file_path' or 'series_name' has to be provided"
140+
)
141+
if file_path is not None and series_name is not None:
142+
raise ValueError("Only one of 'file_path' or 'series_name' should be provided")
143+
144+
# Validate file path if provided
145+
if file_path is not None:
146+
try:
147+
file_path = _validate_and_sanitise(file_path, session_id, db)
148+
except Exception:
149+
raise Exception
150+
151+
# Validate series name to use
152+
if series_name is not None:
153+
if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False:
154+
raise ValueError("One or more characters in the string are not permitted")
155+
156+
# Return database entry if it exists
157+
try:
158+
db_entry = (
159+
db.exec(
160+
select(table)
161+
.where(table.session_id == session_id)
162+
.where(table.file_path == str(file_path))
163+
).one()
164+
if file_path is not None
165+
else db.exec(
166+
select(table)
167+
.where(table.session_id == session_id)
168+
.where(table.series_name == series_name)
169+
).one()
170+
)
171+
# Create and register new entry if not present
172+
except NoResultFound:
173+
db_entry = (
174+
table(
175+
file_path=str(file_path),
176+
session_id=session_id,
177+
)
178+
if file_path is not None
179+
else table(
180+
series_name=series_name,
181+
session_id=session_id,
182+
)
183+
)
184+
db.add(db_entry)
185+
db.commit()
186+
db.refresh(db_entry)
187+
except Exception:
188+
raise Exception
189+
190+
return db_entry
191+
192+
193+
def register_lif_preprocessing_result(
194+
message: dict, db: Session, demo: bool = False
195+
) -> bool:
10196
"""
11197
session_id (recipe)
12198
register (wrapper)
13-
output_files (wrapper)
199+
result (wrapper)
14200
key1
15201
key2
16202
...
17203
"""
18-
pass
19204

205+
session_id: int = (
206+
int(message["session_id"])
207+
if not isinstance(message["session_id"], int)
208+
else message["session_id"]
209+
)
210+
211+
# Validate message and try and load results
212+
if isinstance(message["result"], str):
213+
try:
214+
json_obj: dict = json.loads(message["result"])
215+
result = LIFPreprocessingResult(**json_obj)
216+
except Exception as e:
217+
logger.error(
218+
f"Exception encountered when parsing LIF preprocessing result: {e}"
219+
)
220+
return False
221+
elif isinstance(message["result"], dict):
222+
try:
223+
result = LIFPreprocessingResult(**message["result"])
224+
except Exception as e:
225+
logger.error(
226+
f"Exception encountered when parsing LIF preprocessing result: {e}"
227+
)
228+
return False
229+
else:
230+
logger.error(
231+
f"Invalid type for LIF preprocessing result: {type(message['result'])}"
232+
)
233+
return False
234+
235+
# Register items in database if not already present
236+
try:
237+
clem_img_stk: CLEMImageStack = get_db_entry(
238+
db=db,
239+
table=CLEMImageStack,
240+
session_id=session_id,
241+
file_path=result.image_stack,
242+
)
243+
244+
clem_img_series: CLEMImageSeries = get_db_entry(
245+
db=db,
246+
table=CLEMImageSeries,
247+
session_id=session_id,
248+
series_name=result.series_name,
249+
)
250+
251+
clem_metadata: CLEMImageMetadata = get_db_entry(
252+
db=db,
253+
table=CLEMImageMetadata,
254+
session_id=session_id,
255+
file_path=result.metadata,
256+
)
257+
258+
clem_lif_file: CLEMLIFFile = get_db_entry(
259+
db=db,
260+
table=CLEMLIFFile,
261+
session_id=session_id,
262+
file_path=result.parent_lif,
263+
)
264+
265+
# Link entries to one another
266+
clem_img_stk.associated_metadata = clem_metadata
267+
clem_img_stk.parent_lif = clem_lif_file
268+
clem_img_stk.parent_series = clem_img_series
269+
clem_img_stk.channel_name = result.channel
270+
clem_img_stk.stack_created = True
271+
db.add(clem_img_stk)
272+
db.commit()
273+
db.refresh()
274+
275+
return True
276+
277+
except Exception as e:
278+
logger.error(
279+
f"Exception encountered when registering LIF preprocessing result: {e}"
280+
)
281+
return False
282+
283+
finally:
284+
db.close()
285+
286+
287+
def register_tiff_preprocessing_result(
288+
message: dict, db: Session, demo: bool = False
289+
) -> bool:
290+
291+
session_id: int = (
292+
int(message["session_id"])
293+
if not isinstance(message["session_id"], int)
294+
else message["session_id"]
295+
)
296+
if isinstance(message["result"], str):
297+
json_obj: dict = json.loads(message["result"])
298+
result = TIFFPreprocessingResult(**json_obj)
299+
elif isinstance(message["result"], dict):
300+
result = TIFFPreprocessingResult(**message["result"])
301+
else:
302+
logger.error("Invalid type for TIFF preprocessing result")
303+
return False
304+
305+
if result and session_id:
306+
pass
20307

21-
def register_tiff_preprocessing_result(message: dict):
22-
pass
308+
return True

0 commit comments

Comments
 (0)