Skip to content

Commit 67854bb

Browse files
authored
Added entry point to request for align-and-merge job for CLEM workflow and register its results (#411)
* Added entry point to request for align-and-merge processing job for CLEM workflow * Added entry point to register CLEM align-and-merge results * Moved CLEM database functions to murfey.workflows.clem.__init__ * Standardised CLEM workflow names
1 parent f46fb60 commit 67854bb

File tree

7 files changed

+664
-314
lines changed

7 files changed

+664
-314
lines changed

pyproject.toml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,12 @@ murfey = "murfey.client:run"
9898
[project.entry-points."murfey.config.extraction"]
9999
"murfey_machine" = "murfey.util.config:get_extended_machine_config"
100100
[project.entry-points."murfey.workflows"]
101-
"process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request"
102-
"process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request"
103-
"register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
104-
"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
101+
"clem.align_and_merge" = "murfey.workflows.clem.align_and_merge:submit_cluster_request"
102+
"clem.process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request"
103+
"clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request"
104+
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
105+
"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
106+
"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
105107

106108
[tool.setuptools]
107109
package-dir = {"" = "src"}

src/murfey/server/api/clem.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,9 @@ def process_raw_lifs(
629629
try:
630630
# Try and load relevant Murfey workflow
631631
workflow: EntryPoint = list(
632-
entry_points().select(group="murfey.workflows", name="process_raw_lifs")
632+
entry_points().select(
633+
group="murfey.workflows", name="clem.process_raw_lifs"
634+
)
633635
)[0]
634636
except IndexError:
635637
raise RuntimeError("The relevant Murfey workflow was not found")
@@ -661,7 +663,9 @@ def process_raw_tiffs(
661663
try:
662664
# Try and load relevant Murfey workflow
663665
workflow: EntryPoint = list(
664-
entry_points().select(group="murfey.workflows", name="process_raw_tiffs")
666+
entry_points().select(
667+
group="murfey.workflows", name="clem.process_raw_tiffs"
668+
)
665669
)[0]
666670
except IndexError:
667671
raise RuntimeError("The relevant Murfey workflow was not found")

src/murfey/util/models.py

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from __future__ import annotations
22

3-
from ast import literal_eval
43
from datetime import datetime
54
from pathlib import Path
65
from typing import Any, Dict, List, Optional
76

8-
from pydantic import BaseModel, validator
7+
from pydantic import BaseModel
98

109
"""
1110
General Models
@@ -161,40 +160,6 @@ class TIFFSeriesInfo(BaseModel):
161160
series_metadata: Path
162161

163162

164-
class LIFPreprocessingResult(BaseModel):
165-
image_stack: Path
166-
metadata: Path
167-
series_name: str
168-
channel: str
169-
number_of_members: int
170-
parent_lif: Path
171-
172-
173-
class TIFFPreprocessingResult(BaseModel):
174-
image_stack: Path
175-
metadata: Path
176-
series_name: str
177-
channel: str
178-
number_of_members: int
179-
parent_tiffs: list[Path]
180-
181-
@validator(
182-
"parent_tiffs",
183-
pre=True,
184-
)
185-
def parse_stringified_list(cls, value):
186-
if isinstance(value, str):
187-
try:
188-
eval_result = literal_eval(value)
189-
if isinstance(eval_result, list):
190-
parent_tiffs = [Path(p) for p in eval_result]
191-
return parent_tiffs
192-
except (SyntaxError, ValueError):
193-
raise ValueError("Unable to parse input")
194-
# Return value as-is; if it fails, it fails
195-
return value
196-
197-
198163
"""
199164
FIB
200165
===
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import re
5+
from pathlib import Path
6+
from typing import Optional, Type, Union
7+
8+
from sqlalchemy.exc import NoResultFound
9+
from sqlmodel import Session, select
10+
11+
from murfey.util.config import get_machine_config
12+
from murfey.util.db import (
13+
CLEMImageMetadata,
14+
CLEMImageSeries,
15+
CLEMImageStack,
16+
CLEMLIFFile,
17+
CLEMTIFFFile,
18+
)
19+
from murfey.util.db import Session as MurfeySession
20+
21+
logger = logging.getLogger("murfey.workflows.clem")
22+
23+
24+
"""
25+
HELPER FUNCTIONS FOR CLEM DATABASE
26+
"""
27+
28+
29+
def _validate_and_sanitise(
30+
file: Path,
31+
session_id: int,
32+
db: Session,
33+
) -> Path:
34+
"""
35+
Performs validation and sanitisation on the incoming file paths, ensuring that
36+
no forbidden characters are present and that the the path points only to allowed
37+
sections of the file server.
38+
39+
Returns the file path as a sanitised string that can be converted into a Path
40+
object again.
41+
42+
NOTE: Due to the instrument name query, 'db' now needs to be passed as an
43+
explicit variable to this function from within a FastAPI endpoint, as using the
44+
instance that was imported directly won't load it in the correct state.
45+
"""
46+
47+
valid_file_types = (
48+
".lif",
49+
".tif",
50+
".tiff",
51+
".xlif",
52+
".xml",
53+
)
54+
55+
# Resolve symlinks and directory changes to get full file path
56+
full_path = Path(file).resolve()
57+
58+
# Use machine configuration to validate which file base paths are accepted from
59+
instrument_name = (
60+
db.exec(select(MurfeySession).where(MurfeySession.id == session_id))
61+
.one()
62+
.instrument_name
63+
)
64+
machine_config = get_machine_config(instrument_name=instrument_name)[
65+
instrument_name
66+
]
67+
rsync_basepath = machine_config.rsync_basepath
68+
try:
69+
base_path = list(rsync_basepath.parents)[-2].as_posix()
70+
except IndexError:
71+
logger.warning(f"Base path {rsync_basepath!r} is too short")
72+
base_path = rsync_basepath.as_posix()
73+
except Exception as e:
74+
raise Exception(
75+
f"Unexpected exception encountered when loading the file base path: {e}"
76+
)
77+
78+
# Check that full file path doesn't contain unallowed characters
79+
# Currently allows only:
80+
# - words (alphanumerics and "_"; \w),
81+
# - spaces (\s),
82+
# - periods,
83+
# - dashes,
84+
# - forward slashes ("/")
85+
if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False:
86+
raise ValueError(f"Unallowed characters present in {file}")
87+
88+
# Check that it's not accessing somehwere it's not allowed
89+
if not str(full_path).startswith(str(base_path)):
90+
raise ValueError(f"{file} points to a directory that is not permitted")
91+
92+
# Check that it's a file, not a directory
93+
if full_path.is_file() is False:
94+
raise ValueError(f"{file} is not a file")
95+
96+
# Check that it is of a permitted file type
97+
if f"{full_path.suffix}" not in valid_file_types:
98+
raise ValueError(f"{full_path.suffix} is not a permitted file format")
99+
100+
return full_path
101+
102+
103+
def get_db_entry(
104+
db: Session,
105+
# With the database search funcion having been moved out of the FastAPI
106+
# endpoint, the database now has to be explicitly passed within the FastAPI
107+
# endpoint function in order for it to be loaded in the correct state.
108+
table: Type[
109+
Union[
110+
CLEMImageMetadata,
111+
CLEMImageSeries,
112+
CLEMImageStack,
113+
CLEMLIFFile,
114+
CLEMTIFFFile,
115+
]
116+
],
117+
session_id: int,
118+
file_path: Optional[Path] = None,
119+
series_name: Optional[str] = None,
120+
) -> Union[
121+
CLEMImageMetadata,
122+
CLEMImageSeries,
123+
CLEMImageStack,
124+
CLEMLIFFile,
125+
CLEMTIFFFile,
126+
]:
127+
"""
128+
Searches the CLEM workflow-related tables in the Murfey database for an entry that
129+
matches the file path or series name within a given session. Returns the entry if
130+
a match is found, otherwise register it as a new entry in the database.
131+
"""
132+
133+
# Validate that parameters are provided correctly
134+
if file_path is None and series_name is None:
135+
raise ValueError(
136+
"One of either 'file_path' or 'series_name' has to be provided"
137+
)
138+
if file_path is not None and series_name is not None:
139+
raise ValueError("Only one of 'file_path' or 'series_name' should be provided")
140+
141+
# Validate file path if provided
142+
if file_path is not None:
143+
try:
144+
file_path = _validate_and_sanitise(file_path, session_id, db)
145+
except Exception:
146+
raise Exception
147+
148+
# Validate series name to use
149+
if series_name is not None:
150+
if bool(re.fullmatch(r"^[\w\s\.\-/]+$", series_name)) is False:
151+
raise ValueError("One or more characters in the string are not permitted")
152+
153+
# Return database entry if it exists
154+
try:
155+
db_entry = (
156+
db.exec(
157+
select(table)
158+
.where(table.session_id == session_id)
159+
.where(table.file_path == str(file_path))
160+
).one()
161+
if file_path is not None
162+
else db.exec(
163+
select(table)
164+
.where(table.session_id == session_id)
165+
.where(table.series_name == series_name)
166+
).one()
167+
)
168+
# Create and register new entry if not present
169+
except NoResultFound:
170+
db_entry = (
171+
table(
172+
file_path=str(file_path),
173+
session_id=session_id,
174+
)
175+
if file_path is not None
176+
else table(
177+
series_name=series_name,
178+
session_id=session_id,
179+
)
180+
)
181+
db.add(db_entry)
182+
db.commit()
183+
db.refresh(db_entry)
184+
except Exception:
185+
raise Exception
186+
187+
return db_entry
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""
2+
Script to allow Murfey to request for an image alignment, colorisation, and merge job
3+
from cryoemservices.
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from pathlib import Path
9+
from typing import Literal, Optional
10+
11+
from murfey.util.config import get_machine_config
12+
13+
try:
14+
from murfey.server.ispyb import TransportManager # Session
15+
except AttributeError:
16+
pass # Ignore if ISPyB credentials environment variable not set
17+
18+
19+
def submit_cluster_request(
20+
# Session parameters
21+
session_id: int,
22+
instrument_name: str,
23+
# Processing parameters
24+
series_name: str,
25+
images: list[Path],
26+
metadata: Path,
27+
# Optional processing parameters
28+
align_self: Optional[str] = None,
29+
flatten: Optional[Literal["min", "max", "mean"]] = "mean",
30+
align_across: Optional[str] = None,
31+
# Optional session parameters
32+
messenger: Optional[TransportManager] = None,
33+
):
34+
if not messenger:
35+
raise Exception("Unable to find transport manager")
36+
37+
# Load feedback queue
38+
machine_config = get_machine_config()[instrument_name]
39+
feedback_queue: str = machine_config.feedback_queue
40+
41+
# Work out session directory from file path
42+
processed_folder = machine_config.processed_directory_name
43+
if not images:
44+
raise ValueError(f"No image files have been provided for {series_name!r}")
45+
reference_file = images[0]
46+
path_parts = list(reference_file.parts)
47+
path_parts[0] = "" if path_parts[0] == "/" else path_parts[0]
48+
try:
49+
root_index = path_parts.index(processed_folder)
50+
except ValueError:
51+
raise ValueError(
52+
f"The processed directory {processed_folder!r} could not be found in the "
53+
f"file path for {str(reference_file)!r}"
54+
)
55+
session_dir = Path("/".join(path_parts[:root_index]))
56+
57+
# Submit message to cryoemservices
58+
messenger.send(
59+
"processing_recipe",
60+
{
61+
"recipes": ["clem-align-and-merge"],
62+
"parameters": {
63+
# Job parameters
64+
"series_name": series_name,
65+
"images": [str(file) for file in images],
66+
"metadata": str(metadata),
67+
"align_self": align_self,
68+
"flatten": flatten,
69+
"align_across": align_across,
70+
# Other recipe parameters
71+
"session_dir": str(session_dir),
72+
"session_id": session_id,
73+
"job_name": series_name,
74+
"feedback_queue": feedback_queue,
75+
},
76+
},
77+
new_connection=True,
78+
)
79+
return True

0 commit comments

Comments
 (0)