Skip to content

Commit 7821855

Browse files
authored
Added workflows to register CLEM preprocessing results (#396)
Murfey can now recognise messages from the LIF-to-image stack and TIFF-to-image stack processes, and register the files and metadata associated with that batch job in the PostgreSQL database accordingly.
1 parent ba6e694 commit 7821855

File tree

13 files changed

+679
-177
lines changed

13 files changed

+679
-177
lines changed

pyproject.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,10 @@ murfey = "murfey.client:run"
9898
[project.entry-points."murfey.config.extraction"]
9999
"murfey_machine" = "murfey.util.config:get_extended_machine_config"
100100
[project.entry-points."murfey.workflows"]
101-
"lif_to_stack" = "murfey.workflows.lif_to_stack:zocalo_cluster_request"
102-
"tiff_to_stack" = "murfey.workflows.tiff_to_stack:zocalo_cluster_request"
101+
"process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request"
102+
"process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request"
103+
"register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
104+
"register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
103105

104106
[tool.setuptools]
105107
package-dir = {"" = "src"}

src/murfey/client/contexts/clem.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ def process_lif_file(
386386

387387
try:
388388
# Construct the URL to post the request to
389-
url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/lif_to_stack?lif_file={quote(str(lif_file), safe='')}"
389+
url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/process_raw_lifs?lif_file={quote(str(lif_file), safe='')}"
390390
# Validate
391391
if not url:
392392
logger.error(
@@ -442,7 +442,7 @@ def process_tiff_series(
442442

443443
try:
444444
# Construct URL for Murfey server to communicate with
445-
url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/tiff_to_stack"
445+
url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/clem/preprocessing/process_raw_tiffs"
446446
if not url:
447447
logger.error(
448448
"URL could not be constructed from the environment and file path"

src/murfey/server/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
from murfey.server.ispyb import TransportManager # Session
5757
except AttributeError:
5858
pass
59+
from backports.entry_points_selectable import entry_points
60+
from importlib_metadata import EntryPoint # For type hinting only
61+
5962
import murfey.util.db as db
6063
from murfey.util import LogFilter
6164
from murfey.util.spa_params import default_spa_parameters
@@ -2964,6 +2967,26 @@ def feedback_callback(header: dict, message: dict) -> None:
29642967
if _transport_object:
29652968
_transport_object.transport.ack(header)
29662969
return None
2970+
elif (
2971+
message["register"] in entry_points().select(group="murfey.workflows").names
2972+
):
2973+
# Run the workflow if a match is found
2974+
workflow: EntryPoint = list( # Returns a list of either 1 or 0
2975+
entry_points().select(
2976+
group="murfey.workflows", name=message["register"]
2977+
)
2978+
)[0]
2979+
result = workflow.load()(
2980+
message=message,
2981+
db=murfey_db,
2982+
)
2983+
if _transport_object:
2984+
if result:
2985+
_transport_object.transport.ack(header)
2986+
else:
2987+
# Send it directly to DLQ without trying to rerun it
2988+
_transport_object.transport.nack(header, requeue=False)
2989+
return None
29672990
if _transport_object:
29682991
_transport_object.transport.nack(header, requeue=False)
29692992
return None

src/murfey/server/api/clem.py

Lines changed: 66 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from __future__ import annotations
22

33
import re
4-
import sys
54
import traceback
5+
from importlib.metadata import EntryPoint # type hinting only
66
from logging import getLogger
77
from pathlib import Path
88
from typing import Optional, Type, Union
99

10+
from backports.entry_points_selectable import entry_points
1011
from fastapi import APIRouter
1112
from sqlalchemy.exc import NoResultFound
1213
from sqlmodel import Session, select
@@ -22,13 +23,7 @@
2223
CLEMTIFFFile,
2324
)
2425
from murfey.util.db import Session as MurfeySession
25-
from murfey.util.models import TiffSeriesInfo
26-
27-
# Use backport from importlib_metadata for Python <3.10
28-
if sys.version_info.major == 3 and sys.version_info.minor < 10:
29-
from importlib_metadata import EntryPoint, entry_points
30-
else:
31-
from importlib.metadata import EntryPoint, entry_points
26+
from murfey.util.models import TIFFSeriesInfo
3227

3328
# Set up logger
3429
logger = getLogger("murfey.server.api.clem")
@@ -81,23 +76,15 @@ def validate_and_sanitise(
8176
machine_config = get_machine_config(instrument_name=instrument_name)[
8277
instrument_name
8378
]
84-
rsync_basepath = machine_config.rsync_basepath
85-
try:
86-
base_path = list(rsync_basepath.parents)[-2].as_posix()
87-
except IndexError:
88-
# Print to troubleshoot
89-
logger.warning(f"Base path {rsync_basepath!r} is too short")
90-
base_path = rsync_basepath.as_posix()
91-
except Exception:
92-
raise Exception("Unexpected exception occurred when loading the file base path")
79+
base_path = machine_config.rsync_basepath.as_posix()
9380

9481
# Check that full file path doesn't contain unallowed characters
95-
# Currently allows only:
96-
# - words (alphanumerics and "_"; \w),
97-
# - spaces (\s),
98-
# - periods,
99-
# - dashes,
100-
# - forward slashes ("/")
82+
# Currently allows only:
83+
# - words (alphanumerics and "_"; \w),
84+
# - spaces (\s),
85+
# - periods,
86+
# - dashes,
87+
# - forward slashes ("/")
10188
if bool(re.fullmatch(r"^[\w\s\.\-/]+$", str(full_path))) is False:
10289
raise ValueError(f"Unallowed characters present in {file}")
10390

@@ -631,51 +618,68 @@ def register_image_stack(
631618
"""
632619

633620

634-
@router.post("/sessions/{session_id}/lif_to_stack") # API posts to this URL
635-
def lif_to_stack(
621+
@router.post(
622+
"/sessions/{session_id}/clem/preprocessing/process_raw_lifs"
623+
) # API posts to this URL
624+
def process_raw_lifs(
636625
session_id: int, # Used by the decorator
637626
lif_file: Path,
627+
db: Session = murfey_db,
638628
):
639-
# Get command line entry point
640-
murfey_workflows = entry_points().select(
641-
group="murfey.workflows", name="lif_to_stack"
642-
)
643-
644-
# Use entry point if found
645-
if len(murfey_workflows) == 1:
646-
workflow: EntryPoint = list(murfey_workflows)[0]
647-
workflow.load()(
648-
# Match the arguments found in murfey.workflows.lif_to_stack
649-
file=lif_file,
650-
root_folder="images",
651-
messenger=_transport_object,
652-
)
653-
return True
654-
# Raise error if Murfey workflow not found
655-
else:
629+
try:
630+
# Try and load relevant Murfey workflow
631+
workflow: EntryPoint = list(
632+
entry_points().select(group="murfey.workflows", name="process_raw_lifs")
633+
)[0]
634+
except IndexError:
656635
raise RuntimeError("The relevant Murfey workflow was not found")
657636

637+
# Get instrument name from the database to load the correct config file
638+
session_row: MurfeySession = db.exec(
639+
select(MurfeySession).where(MurfeySession.id == session_id)
640+
).one()
641+
instrument_name = session_row.instrument_name
642+
643+
# Pass arguments along to the correct workflow
644+
workflow.load()(
645+
# Match the arguments found in murfey.workflows.clem.process_raw_lifs
646+
file=lif_file,
647+
root_folder="images",
648+
session_id=session_id,
649+
instrument_name=instrument_name,
650+
messenger=_transport_object,
651+
)
652+
return True
653+
658654

659-
@router.post("/sessions/{session_id}/tiff_to_stack")
660-
def tiff_to_stack(
655+
@router.post("/sessions/{session_id}/clem/preprocessing/process_raw_tiffs")
656+
def process_raw_tiffs(
661657
session_id: int, # Used by the decorator
662-
tiff_info: TiffSeriesInfo,
658+
tiff_info: TIFFSeriesInfo,
659+
db: Session = murfey_db,
663660
):
664-
# Get command line entry point
665-
murfey_workflows = entry_points().select(
666-
group="murfey.workflows", name="tiff_to_stack"
667-
)
668-
669-
# Use entry point if found
670-
if murfey_workflows:
671-
workflow: EntryPoint = list(murfey_workflows)[0]
672-
workflow.load()(
673-
# Match the arguments found in murfey.workflows.tiff_to_stack
674-
file=tiff_info.tiff_files[0], # Pass it only one file from the list
675-
root_folder="images",
676-
metadata=tiff_info.series_metadata,
677-
messenger=_transport_object,
678-
)
679-
# Raise error if Murfey workflow not found
680-
else:
661+
try:
662+
# Try and load relevant Murfey workflow
663+
workflow: EntryPoint = list(
664+
entry_points().select(group="murfey.workflows", name="process_raw_tiffs")
665+
)[0]
666+
except IndexError:
681667
raise RuntimeError("The relevant Murfey workflow was not found")
668+
669+
# Get instrument name from the database to load the correct config file
670+
session_row: MurfeySession = db.exec(
671+
select(MurfeySession).where(MurfeySession.id == session_id)
672+
).one()
673+
instrument_name = session_row.instrument_name
674+
675+
# Pass arguments to correct workflow
676+
workflow.load()(
677+
# Match the arguments found in murfey.workflows.clem.process_raw_tiffs
678+
tiff_list=tiff_info.tiff_files,
679+
root_folder="images",
680+
session_id=session_id,
681+
instrument_name=instrument_name,
682+
metadata=tiff_info.series_metadata,
683+
messenger=_transport_object,
684+
)
685+
return True

src/murfey/server/murfey_db.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import os
43
from functools import partial
54

65
import yaml
@@ -11,8 +10,6 @@
1110

1211
from murfey.util.config import Security, get_security_config
1312

14-
instrument_name = os.getenv("BEAMLINE", "")
15-
1613

1714
def url(security_config: Security | None = None) -> str:
1815
security_config = security_config or get_security_config()

src/murfey/util/db.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,12 @@ class CLEMImageSeries(SQLModel, table=True): # type: ignore
269269
) # One to many
270270

271271
# Process checklist for series
272-
images_aligned: bool = False # Image stacks aligned to reference image
273-
rgbs_created: bool = False # Image stacks all colorised
274-
composite_created: bool = False # Composite flattened image created
272+
number_of_members: int = (
273+
0 # Expected number of image stacks belonging to this series
274+
)
275+
images_aligned: bool = False # Have all members been aligned?
276+
rgbs_created: bool = False # Have all members been colourised?
277+
composite_created: bool = False # Has a composite image been created?
275278
composite_image: Optional[str] = None # Full path to composite image
276279

277280

src/murfey/util/models.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from __future__ import annotations
22

3+
from ast import literal_eval
34
from datetime import datetime
45
from pathlib import Path
56
from typing import Any, Dict, List, Optional
67

7-
from pydantic import BaseModel
8+
from pydantic import BaseModel, validator
89

910
"""
1011
General Models
@@ -154,12 +155,46 @@ class FractionationParameters(BaseModel):
154155
"""
155156

156157

157-
class TiffSeriesInfo(BaseModel):
158+
class TIFFSeriesInfo(BaseModel):
158159
series_name: str
159160
tiff_files: List[Path]
160161
series_metadata: Path
161162

162163

164+
class LIFPreprocessingResult(BaseModel):
165+
image_stack: Path
166+
metadata: Path
167+
series_name: str
168+
channel: str
169+
number_of_members: int
170+
parent_lif: Path
171+
172+
173+
class TIFFPreprocessingResult(BaseModel):
174+
image_stack: Path
175+
metadata: Path
176+
series_name: str
177+
channel: str
178+
number_of_members: int
179+
parent_tiffs: list[Path]
180+
181+
@validator(
182+
"parent_tiffs",
183+
pre=True,
184+
)
185+
def parse_stringified_list(cls, value):
186+
if isinstance(value, str):
187+
try:
188+
eval_result = literal_eval(value)
189+
if isinstance(eval_result, list):
190+
parent_tiffs = [Path(p) for p in eval_result]
191+
return parent_tiffs
192+
except (SyntaxError, ValueError):
193+
raise ValueError("Unable to parse input")
194+
# Return value as-is; if it fails, it fails
195+
return value
196+
197+
163198
"""
164199
FIB
165200
===

src/murfey/workflows/clem/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)